In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DMA').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/12 08:31:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Load data. 
data = spark.read.csv('BDAS_Iteration_Dataset/final_df.csv',inferSchema=True,header=True)
data.show()

+---+---+------+-------+----------+------------------+
|wid|age|gender|marital|parenthood|predicted_category|
+---+---+------+-------+----------+------------------+
|  1| 37|     m|married|         y|       achievement|
|  2| 29|     m|married|         y|       achievement|
|  3| 25|     m| single|         n|       achievement|
|  4| 32|     m|married|         y|       achievement|
|  5| 29|     m|married|         y|       achievement|
|  6| 35|     m|married|         y|       achievement|
|  7| 34|     m|married|         y|       achievement|
|  8| 29|     m| single|         n|       achievement|
| 10| 27|     m| single|         n|       achievement|
| 12| 25|     f| single|         n|       achievement|
| 13| 45|     m|married|         y|       achievement|
| 14| 25|     m|married|         y|       achievement|
| 15| 27|     m|married|         y|       achievement|
| 16| 35|     m| single|         n|       achievement|
| 17| 30|     m| single|         n|       achievement|
| 18| 30| 

In [3]:
data.printSchema()

root
 |-- wid: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- parenthood: string (nullable = true)
 |-- predicted_category: string (nullable = true)



In [4]:
data.columns

['wid', 'age', 'gender', 'marital', 'parenthood', 'predicted_category']

In [5]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Create StringIndexers for categorical predictor variables
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
marital_indexer = StringIndexer(inputCol="marital", outputCol="marital_index")
parenthood_indexer = StringIndexer(inputCol="parenthood", outputCol="parenthood_index")

# Create a StringIndexer for the target variable
target_indexer = StringIndexer(inputCol="predicted_category", outputCol="label")

# Assemble the predictor variables into a single features column
assembler = VectorAssembler(
    inputCols=["gender_index", "marital_index", "parenthood_index"],
    outputCol="features"
)

# Define a pipeline to execute the transformations
pipeline = Pipeline(stages=[gender_indexer, marital_indexer, parenthood_indexer, target_indexer, assembler])

# Fit and transform the data using the pipeline
model = pipeline.fit(data)
data = model.transform(data)

# Select the relevant columns for your machine learning model
final_data = data.select("label", "features")

# Show the first few rows of the final_data DataFrame
final_data.show()

                                                                                

+-----+-------------+
|label|     features|
+-----+-------------+
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|    (3,[],[])|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|    (3,[],[])|
|  0.0|    (3,[],[])|
|  0.0|[1.0,0.0,0.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|    (3,[],[])|
|  0.0|    (3,[],[])|
|  0.0|    (3,[],[])|
|  0.0|[0.0,1.0,0.0]|
|  0.0|[0.0,1.0,1.0]|
|  0.0|[0.0,0.0,1.0]|
|  0.0|    (3,[],[])|
+-----+-------------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import col
data.select("gender", "gender_index").distinct().orderBy("gender_index").show()
data.select("marital", "marital_index").distinct().orderBy("marital_index").show()
data.select("parenthood", "parenthood_index").distinct().orderBy("parenthood_index").show()
data.select("predicted_category", "label").distinct().orderBy("label").show()

+------+------------+
|gender|gender_index|
+------+------------+
|     m|         0.0|
|     f|         1.0|
+------+------------+

+-------+-------------+
|marital|marital_index|
+-------+-------------+
| single|          0.0|
|married|          1.0|
+-------+-------------+

+----------+----------------+
|parenthood|parenthood_index|
+----------+----------------+
|         n|             0.0|
|         y|             1.0|
+----------+----------------+

+------------------+-----+
|predicted_category|label|
+------------------+-----+
|       achievement|  0.0|
|         affection|  1.0|
+------------------+-----+



In [7]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LinearSVC,LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Split the data into training and testing sets
train_data, test_data = final_data.randomSplit([0.7, 0.3])

# List of models
models = {
    "Random Forest": RandomForestClassifier(featuresCol="features", labelCol="label"),
    "Gradient Boosted Trees": GBTClassifier(featuresCol="features", labelCol="label"),
    "Support Vector Machines": LinearSVC(featuresCol="features", labelCol="label"),
    "LogisticRegression": LogisticRegression(featuresCol="features", labelCol="label")
}

# Train and evaluate each model
for model_name, model in models.items():
    trained_model = model.fit(train_data)
    predictions = trained_model.transform(test_data)
    
   # Calculate AUC
    binary_evaluator = BinaryClassificationEvaluator()
    auc = binary_evaluator.evaluate(predictions)
    
    # Calculate Accuracy
    multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = multiclass_evaluator.evaluate(predictions)
    
    print(f"{model_name} AUC: {auc}")
    print(f"{model_name} Accuracy: {accuracy}")

[Stage 30:>                                                         (0 + 1) / 1]                                                                                

Random Forest AUC: 0.5
Random Forest Accuracy: 0.8018698309960446


23/10/12 08:32:04 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/10/12 08:32:04 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


Gradient Boosted Trees AUC: 0.6414342451148747
Gradient Boosted Trees Accuracy: 0.8018698309960446


23/10/12 08:32:08 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/10/12 08:32:10 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/10/12 08:32:11 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


Support Vector Machines AUC: 0.5808977562198367
Support Vector Machines Accuracy: 0.8018698309960446
LogisticRegression AUC: 0.6421886826235218
LogisticRegression Accuracy: 0.8018698309960446


In [8]:
import os
from pyspark.ml.classification import GBTClassificationModel, LogisticRegressionModel

# Train all the models
trained_models = {}
for model_name, model in models.items():
    trained_model = model.fit(train_data)
    trained_models[model_name] = trained_model

# Get the current working directory
current_dir = os.getcwd()

# Save the GBT and LR models in the current working directory
for model_name, trained_model in trained_models.items():
    if model_name in ["Gradient Boosted Trees", "LogisticRegression"]:
        model_path = os.path.join(current_dir, f"{model_name}_model") 
        trained_model.write().overwrite().save(model_path)


23/10/12 08:32:25 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/10/12 08:32:27 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/10/12 08:32:28 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
                                                                                