<font color="red">Step 1.</font>
<font color="blue">Setting up spark pyspark.</font>

In [None]:
!pip install pyspark py4j



<font color="blue"> Importing SparkSession class </font> <br>
<font color="blue"> SparkSession.builder create a builder object to configure SparkSession </font> <br>
<font color="blue">.appName("Readingtextfile") set the name  of the Spark Application. It is useful for identifying the job in the spark UI. </font> <br>
<font color="blue"> .getOrCreate If one spark session exist, use that , if not create a new one</font> <br>

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Readingtextfile").getOrCreate()

<font color="red">Step 2: Data collection</font>
<font color="blue"> Uploading the text data file. Colab does not save. So, we have to upload every time. We can keep the file saved in Google drive </font> <br>

In [None]:
from google.colab import files
rdd_data =files.upload()

Saving amazon_cells_labelled.txt to amazon_cells_labelled.txt


<font color="red">Step 3.</font>
<font color="blue">Reading data file: Using spark.read.text() method, we are reading data from the uploaded file and store it as rdd</font>

In [None]:
rdd = spark.read.text("/content/amazon_cells_labelled.txt")

<font color="blue">Note that we have read the file as rdd not as dataframe. So, it is collection of strings where each strings represents a line from the input file. We have displayed this output below:</font>

In [None]:
rdd.take(10)

[Row(value='So there is no way for me to plug it in here in the US unless I go by a converter.\t0'),
 Row(value='Good case, Excellent value.\t1'),
 Row(value='Great for the jawbone.\t1'),
 Row(value='Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!\t0'),
 Row(value='The mic is great.\t1'),
 Row(value='I have to jiggle the plug to get it to line up right to get decent volume.\t0'),
 Row(value='If you have several dozen or several hundred contacts, then imagine the fun of sending each of them one by one.\t0'),
 Row(value='If you are Razr owner...you must have this!\t1'),
 Row(value='Needless to say, I wasted my money.\t0'),
 Row(value='What a waste of money and time!.\t0')]

In [None]:
from pyspark.sql.functions import split,col

<font color="blue"> Taking an RDD with column named "value". It generally common in spark. Then using split() method to break the string into an array of substring using \t tab as delimiter. Then namming the resultant "split_col()" using .alias(). Next, we are taking first element from this split_col() and putting it in new column named as text. Similarly, we have created anotther column named as label and putting teh second element of split_col() in it. </font>

In [None]:
# First, create the parsed_df with both text and label
parsed_df = rdd.withColumn("text", split(col("value"), "\t").getItem(0)) \
               .withColumn("label", split(col("value"), "\t").getItem(1).cast("integer")) \
               .drop("value")

<font color="red">Step 3.</font>
<font color="blue">Processing data before tokenization</font>

In [None]:
from pyspark.sql.functions import regexp_replace, col, lower

<font color="blue">Pyspark provides regexp_replace function to replace substring within a string column that macthes regular expression pattern. with a-zA-Z, we are keeping only letter from small a-z and capital A-Z. Thus, we processed the data by removing special characters, numbers, and other noise.</font>

In [None]:
cleaned_df = parsed_df.withColumn("clean_text",
    regexp_replace(  # Remove special chars, numbers, extra spaces
        lower(       # Convert to lowercase
            col("text")
        ),
        "[^a-zA-Z\\s]", ""  # Keep only letters, whitespace
    )
).withColumn(
    "clean_text",
    regexp_replace(col("clean_text"), "\\s+", " ")  # Replace multiple spaces with one
)

In [None]:
from pyspark.ml.feature import Tokenizer

<font color="red">Step 4.</font>
<font color="blue"> Tokenize the data.</font>

In [None]:
tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
tokenized_df = tokenizer.transform(cleaned_df)

<font color="red">Step 5.</font>
<font color="blue"> Removing stopwords.</font>

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
stopwords = StopWordsRemover.loadDefaultStopWords("english")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stopwords)
filtered_df = remover.transform(tokenized_df)

<font color="red">Step 6.</font>
<font color="blue"> Applying word2vec to convert tokens to numerical feature
vectors</font>

In [None]:
from pyspark.ml.feature import VectorAssembler, Word2Vec

In [None]:
word2vec = Word2Vec(vectorSize=100,minCount=5,inputCol="filtered_words",outputCol="word2vec_features")

In [None]:
w2v_model = word2vec.fit(filtered_df)

In [None]:
result_df = w2v_model.transform(filtered_df)

<font color="red">Step 7.</font>
<font color="blue"> Preparing the feature vector</font>

In [None]:
assembler = VectorAssembler(inputCols=["word2vec_features"],outputCol="features")

In [None]:
result_df = assembler.transform(result_df)

<font color="red">Step 8.</font>
<font color="blue">  Choose a classification algorithm - Random Forest Classifier</font>

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=5,
    seed=42
)

<font color="red">Step 9.</font>
<font color="blue">  Creating the pipeline</font>

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[rf])

<font color="red">Step 10.</font>
<font color="blue">  Split the data into training and test sets</font>

In [None]:
train_data, test_data = result_df.randomSplit([0.8, 0.2], seed=42)

<font color="red">Step 10.</font>
<font color="blue">  Train the model</font>

In [None]:
model = pipeline.fit(train_data)

<font color="red">Step 11.</font>
<font color="blue">  Make prediction</font>

In [None]:
# Make predictions
predictions = model.transform(test_data)

<font color="red">Step 12.</font>
<font color="blue">  Evaluate the model</font>

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

<font color="red">Step 12.1.</font>
<font color="blue">  Evaluate the model: Binary classification evaluator (uses AUC by default) </font>

In [None]:
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

<font color="red">Step 12.2.</font>
<font color="blue">  Evaluate the model: Multiclass evaluator for accuracy, precision, recall, etc. </font>

In [None]:
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

<font color="red">Step 12.3.</font>
<font color="blue">  Evaluate the model: Calculate evaluation metrics  </font>

In [None]:
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

<font color="red">Step 12.4.</font>
<font color="blue">  Printing out the accuracy, precession, recall and f1 values  </font>

In [None]:
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7201
Accuracy: 0.6605
Precision: 0.6627
Recall: 0.6605
F1 Score: 0.6573


<font color="red">Step 13.</font>
<font color="blue"> Investigation on how various factors affect/help with the model performance. What could be done to improve the model’s performance.  </font>

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

<font color="red">Step 13.1.</font>
<font color="blue">Cross-validation for hyperparameter tuning  </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()

In [None]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)

In [None]:
cv_model = crossval.fit(train_data)

In [None]:
best_model = cv_model.bestModel

In [None]:
cv_prediction = best_model.transform(test_data)

In [None]:
cv_auc = binary_evaluator.evaluate(cv_prediction)

In [None]:
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.7201


<font color="red">Step 13.2.</font>
<font color="blue">Hyperparameter tuning: Effect of number of tree grid  </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [100, 150]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()

In [None]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)

In [None]:
cv_model = crossval.fit(train_data)

In [None]:
best_model = cv_model.bestModel

In [None]:
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.6917


<font color="red">Step 13.3.</font>
<font color="blue">Hyperparameter tuning: Effect of number of tree grid 150-200 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [150, 200]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.7111


<font color="red">Step 13.4.</font>
<font color="blue">Hyperparameter tuning: Effect of number of tree grid 200-250 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [200, 250]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.6926


<font color="red">Step 13.5.</font>
<font color="blue">Hyperparameter tuning: Effect of number of tree grid 250-300 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [250, 300]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.7019


<font color="red">Step 13.6.</font>
<font color="blue">Hyperparameter tuning: Effect of number of tree grid 300-400 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [300, 400]) \
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.6998


<font color="red">Step 13.7.</font>
<font color="blue">Hyperparameter tuning: Effect of number of max depthe 10-20 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.7347


<font color="red">Step 13.8.</font>
<font color="blue">Hyperparameter tuning: Effect of number of max depthe 20-30 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [20, 30]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

Best model AUC on test data = 0.6989


<font color="red">Step 13.8.</font>
<font color="blue">Hyperparameter tuning: Effect of number of max depthe 30-40 </font>

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [30, 40]) \
    .addGrid(rf.impurity, ["gini", "entropy"]) \
    .build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=binary_evaluator,  # Optimizing for AUC
    numFolds=3,
    seed=42
)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_prediction = best_model.transform(test_data)
cv_auc = binary_evaluator.evaluate(cv_prediction)
print(f"Best model AUC on test data = {cv_auc:.4f}")

IllegalArgumentException: RandomForestClassifier_7799b4b5aee6 parameter maxDepth given invalid value 40.

<font color="red">Step 13.9.</font>
<font color="blue">Changing parameter in logistic regression </font>

In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    seed=42
)

In [None]:
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7347
Accuracy: 0.6728
Precision: 0.6754
Recall: 0.6728
F1 Score: 0.6697


<font color="red">Step 13.9.</font>
<font color="blue">Changing parameter in logistic regression </font>

In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=12,
    seed=42
)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7304
Accuracy: 0.6852
Precision: 0.6896
Recall: 0.6852
F1 Score: 0.6813


<font color="red">Step 13.9.</font>
<font color="blue">Changing parameter in logistic regression </font>

In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=12,
    seed=42
)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.6999
Accuracy: 0.6543
Precision: 0.6541
Recall: 0.6543
F1 Score: 0.6541


In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=150,
    maxDepth=12,
    seed=42
)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7073
Accuracy: 0.6605
Precision: 0.6668
Recall: 0.6605
F1 Score: 0.6542


<font color="red">Step 13.9.</font>
<font color="blue">Changing parameter in logistic regression </font>

In [None]:
# Choose a classification algorithm - Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=14,
    seed=42
)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7305
Accuracy: 0.6728
Precision: 0.6767
Recall: 0.6728
F1 Score: 0.6688


<font color="red">Step 14</font>
<font color="blue">Different model:Logistic regression classifier  </font>

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
# Create Logistic Regression classifier
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    family="binomial",  # Explicitly set for binary classification
    elasticNetParam=0.8,  # Balance between L1 (Lasso) and L2 (Ridge) regularization
    regParam=0.1,       # Regularization strength
    maxIter=100,
    tol=1e-6
)


In [None]:
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.6322
Accuracy: 0.5864
Precision: 0.5862
Recall: 0.5864
F1 Score: 0.5825


<font color="red">Step 15</font>
<font color="blue">Different model:Gradient Boost classifier  </font>

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    maxDepth=5,
    stepSize=0.01,
    seed=42
)
pipeline = Pipeline(stages=[gbt])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.6717
Accuracy: 0.6358
Precision: 0.6355
Recall: 0.6358
F1 Score: 0.6350


In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    maxDepth=10,
    stepSize=0.01,
    seed=42
)
pipeline = Pipeline(stages=[gbt])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.6528
Accuracy: 0.5864
Precision: 0.5866
Recall: 0.5864
F1 Score: 0.5865


In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(
    featuresCol="features",
    labelCol="label",
    regParam=0.1,
    maxIter=100,
    standardization=True  # Auto-scales features
)


pipeline = Pipeline(stages=[svm])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7376
Accuracy: 0.6728
Precision: 0.6782
Recall: 0.6728
F1 Score: 0.6679


In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(
    featuresCol="features",
    labelCol="label",
    regParam=0.2,
    maxIter=100,
    standardization=True  # Auto-scales features
)


pipeline = Pipeline(stages=[svm])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7198
Accuracy: 0.6605
Precision: 0.6668
Recall: 0.6605
F1 Score: 0.6542


In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(
    featuresCol="features",
    labelCol="label",
    regParam=0.1,
    maxIter=1000,
    standardization=True  # Auto-scales features
)


pipeline = Pipeline(stages=[svm])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7387
Accuracy: 0.6728
Precision: 0.6782
Recall: 0.6728
F1 Score: 0.6679


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="label",
    maxDepth=5,
    minInstancesPerNode=10,
    impurity="gini",
    seed=42
)

pipeline = Pipeline(stages=[svm])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Area Under ROC (AUC): {auc:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

Area Under ROC (AUC): 0.7387
Accuracy: 0.6728
Precision: 0.6782
Recall: 0.6728
F1 Score: 0.6679
