# 5-Data-Mining Method(s) Selection

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Load data
csv_file_path = 'Merged-data.csv'
df = spark.read.format("csv").option("header", "true").load(csv_file_path)

# Using Binary target variable (1 or 0)
# If 'ClientsSeenRate' > threshold, set 'Target' to 1; otherwise, set it to 0.
threshold = 200
df = df.withColumn('Target', (df['ClientsSeenRate'] > threshold).cast("integer"))

# Select the features and target variable
selected_cols = ['Gender', 'AgeGroup', 'Ethnicity']
feature_cols = ['encoded_' + col for col in selected_cols]

# Encode categorical variables (Gender, AgeGroup, Ethnicity)
indexers = [StringIndexer(inputCol=col, outputCol='encoded_' + col).fit(df) for col in selected_cols]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

# Create and train the logistic regression model
logistic_model = LogisticRegression(featuresCol="features", labelCol="Target")
model = logistic_model.fit(train_data)

# Make predictions on the test set
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(predictions)

# Print the results
print("Area under ROC = {}".format(accuracy))

# Stop the Spark session
spark.stop()



Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/06 19:09:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Area under ROC = 0.689877250807834


In [2]:
df.printSchema()

root
 |-- Record_ID: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- AgeGroup: string (nullable = true)
 |-- OrgType: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- ActivityCode: string (nullable = true)
 |-- ActivityType: string (nullable = true)
 |-- ClientsSeen: string (nullable = true)
 |-- ClientsSeenFaceToFace: string (nullable = true)
 |-- Bednights: string (nullable = true)
 |-- Contacts: string (nullable = true)
 |-- FaceToFaceContacts: string (nullable = true)
 |-- AgeBins: string (nullable = true)
 |-- ClientsSeenRate: string (nullable = true)
 |-- ClientsSeenFaceToFaceRate: string (nullable = true)
 |-- BednightsRate: string (nullable = true)
 |-- ContactsRate: string (nullable = true)
 |-- FaceToFaceContactsRate: string (nullable = true)
 |-- TeamType: string (nullable = true)
 |-- Target: integer (nullable = true)
 |-- encoded_Gender: double (nullable = fals

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("LogisticRegressionCV").getOrCreate()

# Load data
csv_file_path = 'Merged-data.csv'
df = spark.read.format("csv").option("header", "true").load(csv_file_path)

# Using Binary target variable (1 or 0)
# If 'ClientsSeenRate' > threshold, set 'Target' to 1; otherwise, set it to 0.
threshold = 200
df = df.withColumn('Target', (df['ClientsSeenRate'] > threshold).cast("integer"))

# Select the features and target variable
selected_cols = ['Gender', 'AgeGroup', 'Ethnicity']
feature_cols = ['encoded_' + col for col in selected_cols]

# Encode categorical variables (Gender, AgeGroup, Ethnicity)
indexers = [StringIndexer(inputCol=col, outputCol='encoded_' + col).fit(df) for col in selected_cols]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

# Create a logistic regression model
logistic_model = LogisticRegression(featuresCol="features", labelCol="Target")

# Create a parameter grid for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(logistic_model.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(logistic_model.elasticNetParam, [0.0, 0.1, 0.2]) \
    .build()

# Set up the cross-validator
cross_validator = CrossValidator(estimator=logistic_model,
                                 estimatorParamMaps=param_grid,
                                 evaluator=BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderROC"),
                                 numFolds=4,  # We can adjust the number of folds as needed
                                 seed=42)

# Run cross-validation and choose the best set of parameters
cv_model = cross_validator.fit(train_data)

# Make predictions on the test set using the best model
cv_predictions = cv_model.transform(test_data)

# Evaluate the model
cv_accuracy = evaluator.evaluate(cv_predictions)

# Print the results
print("Best Area under ROC after Cross-Validation = {}".format(cv_accuracy))

# Stop the Spark session
spark.stop()


                                                                                

Best Area under ROC after Cross-Validation = 0.8023558392894317


In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("RandomForestClassifierCV").getOrCreate()

# Load data
csv_file_path = 'Merged-data.csv'
df = spark.read.format("csv").option("header", "true").load(csv_file_path)

# Using Binary target variable (1 or 0)
# If 'ClientsSeenRate' > threshold, set 'Target' to 1; otherwise, set it to 0.
threshold = 200
df = df.withColumn('Target', (df['ClientsSeenRate'] > threshold).cast("integer"))

# Select the features and target variable
selected_cols = ['Gender', 'AgeGroup', 'Ethnicity']
feature_cols = ['encoded_' + col for col in selected_cols]

# Encode categorical variables (Gender, AgeGroup, Ethnicity)
indexers = [StringIndexer(inputCol=col, outputCol='encoded_' + col).fit(df) for col in selected_cols]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

# Create a Random Forest classifier
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="Target")

# Create a parameter grid for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(rf_classifier.numTrees, [10, 20, 30]) \
    .addGrid(rf_classifier.maxDepth, [5, 10, 15]) \
    .build()

# Set up the cross-validator
cross_validator = CrossValidator(estimator=rf_classifier,
                                 estimatorParamMaps=param_grid,
                                 evaluator=BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderROC"),
                                 numFolds=4,  # We can adjust the number of folds as needed
                                 seed=42)

# Run cross-validation and choose the best set of parameters
cv_model = cross_validator.fit(train_data)

# Make predictions on the test set using the best model
cv_predictions = cv_model.transform(test_data)

# Evaluate the model
cv_accuracy = evaluator.evaluate(cv_predictions)

# Print the results
print("Best Area under ROC after Cross-Validation = {}".format(cv_accuracy))

# Stop the Spark session
spark.stop()




Best Area under ROC after Cross-Validation = 0.8137707306918326


                                                                                

In [10]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("RandomForestClassifierCV").getOrCreate()

# Load data
csv_file_path = 'Merged-data.csv'
df = spark.read.format("csv").option("header", "true").load(csv_file_path)

# Using Binary target variable (1 or 0)
# If 'ClientsSeenRate' > threshold, set 'Target' to 1; otherwise, set it to 0.
threshold = 200
df = df.withColumn('Target', (df['ClientsSeenRate'] > threshold).cast("integer"))

# Select the features and target variable
selected_cols = ['Gender', 'AgeGroup', 'Ethnicity']
feature_cols = ['encoded_' + col for col in selected_cols]

# Encode categorical variables (Gender, AgeGroup, Ethnicity)
indexers = [StringIndexer(inputCol=col, outputCol='encoded_' + col).fit(df) for col in selected_cols]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

# Create a Random Forest classifier
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="Target")

# Create a parameter grid for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(rf_classifier.numTrees, [10, 20, 30]) \
    .addGrid(rf_classifier.maxDepth, [5, 10, 15]) \
    .build()

# Set up the cross-validator
cross_validator = CrossValidator(estimator=rf_classifier,
                                 estimatorParamMaps=param_grid,
                                 evaluator=BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderROC"),
                                 numFolds=4,  # We can adjust the number of folds as needed
                                 seed=42)

# Run cross-validation and choose the best set of parameters
cv_model = cross_validator.fit(train_data)

# Make predictions on the test set using the best model
cv_predictions = cv_model.transform(test_data)

# Calculate accuracy
accuracy = evaluator.evaluate(cv_predictions, {evaluator.metricName: "accuracy"})
print("Accuracy:", accuracy)





Accuracy: 0.8137707306918326


                                                                                

In [19]:
evaluator = BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
auc_pr = evaluator.evaluate(cv_predictions)
print("AUC-PR:", auc_pr)




AUC-PR: 0.42927963840255934


                                                                                

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(cv_predictions)
print("F1 Score:", f1_score)




F1 Score: 0.7563198045291728


                                                                                

In [21]:
true_positives = cv_predictions.filter("prediction = 1 AND Target = 1").count()
false_positives = cv_predictions.filter("prediction = 1 AND Target = 0").count()
false_negatives = cv_predictions.filter("prediction = 0 AND Target = 1").count()

precision = true_positives / (true_positives + false_positives)
recall = true_positives / (true_positives + false_negatives)

print("Precision:", precision)
print("Recall:", recall)




Precision: 0.6701137538779731
Recall: 0.11375405951022557


                                                                                

In [22]:
confusion_matrix = cv_predictions.groupBy("Target", "prediction").count()
confusion_matrix.show()




+------+----------+-----+
|Target|prediction|count|
+------+----------+-----+
|     1|       0.0|10097|
|     0|       0.0|45613|
|     1|       1.0| 1296|
|     0|       1.0|  638|
+------+----------+-----+



                                                                                

In [23]:
precision = 1296 / (1296 + 638)
print("Precision:", precision)

recall = 1296 / (1296 + 10097)
print("Recall:", recall)

fpr = 638 / (638 + 45613)
print("False Positive Rate:", fpr)

fpr = 638 / (638 + 45613)
print("False Positive Rate:", fpr)

accuracy = (1296 + 45613) / (1296 + 45613 + 638 + 10097)
print("Accuracy:", accuracy)


Precision: 0.6701137538779731
Recall: 0.11375405951022557
False Positive Rate: 0.013794296339538605
False Positive Rate: 0.013794296339538605
Accuracy: 0.8137707306918326
