In [1]:
## Use this section to suppress warnings generated by the code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

In [2]:
# Step 1. Import the necessary libraries
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression as SparkLogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Below are needed for Sequential Feature Selector
from mlxtend.feature_selection import SequentialFeatureSelector as SFS
from sklearn.linear_model import LogisticRegression as SKlearnLogisticRegression
from sklearn.model_selection import cross_val_score
import numpy as np

In [3]:
# Step 2. Create a SparkSession
spark = SparkSession.builder \
        .appName("Logistic Regression using Spark ML") \
        .getOrCreate()

24/05/15 17:59:48 WARN Utils: Your hostname, thinkpad-t14-g3 resolves to a loopback address: 127.0.1.1; using 172.16.1.182 instead (on interface wlp2s0)
24/05/15 17:59:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/15 17:59:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Step 3. Read the data from a CSV file
sdf = spark.read.csv("sources/drybeans.csv", header=True, inferSchema=True)

# print the schema
sdf.printSchema()
# show top 5 rows from the dataframe
sdf.show(5)

root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)

+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----+
| Area|Perimeter|MajorAxisLength|MinorAxisLength|AspectRation|Ec

In [5]:
# Step 4. Data Preprocessing
# Convert Class column from string to numerical values (this will be our target column)
indexer = StringIndexer(inputCol="Class", outputCol="label")
sdf = indexer.fit(sdf).transform(sdf)

# Print the value counts for the target column 'label'
sdf.groupBy('label').count().orderBy('count').show()

# Drop empty rows
sdf = sdf.dropna()

+-----+-----+
|label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+



                                                                                

Below, we will perform cross validation using Sequential Feature Selector (SFS) from mlxtend in order to select the features. For that reason, the sample data will be converted to Pandas. And since the data size is small here, we can easily use 80%-100% of the Spark dataframe for the sample size. However, if you plan to use the same SFS for a much larger dataset, you should consider using a smaller percentage (or different methods) to get the sample size.

In [6]:
# Step 5. Sample the data for cross validation
# Note 1: No need to reduce the data size for a sample if it's a small dataset, but below we do so for demonstration purposes
# Note 2: You should get a smaller sample size if the Spark DataFrame is too large. But for this example, we will use 80% sample size
sampled_data = sdf.sample(False, 0.8)

# Print total number of rows in the original and sampled data
print("Total rows in original DataFrame: ", sdf.count())
print("Total rows in sampled DataFrame: ", sampled_data.count())

# Drop the 'Class' column from the sampled data since it's a string column which is not needed for our testing
sampled_data = sampled_data.drop('Class')

# Convert the sampled data to Pandas DataFrame so that we can use it for cross validation in the next step
pandas_df = sampled_data.toPandas()

Total rows in original DataFrame:  13611
Total rows in sampled DataFrame:  10898


In [7]:
# Step 6. Perform cross validation using Sequential Feature Selector
# X is your matrix of features and y is the target variable
X = pandas_df.drop('label', axis=1)
y = pandas_df['label']

# Create a Logistic Regression model. Adjust the solver and max_iter as needed
lr = SKlearnLogisticRegression(solver='liblinear', max_iter=1000)

# Define the range of features to consider, let's choose between 4-6 features
k_features_range = range(4, 7)

# Dictionary to hold the results and selected features
results = {}
selected_features = {}

for k in k_features_range:
    sfs = SFS(lr, 
              k_features=k, 
              forward=True, 
              floating=False, 
              scoring='accuracy',
              cv=5)

    # Fit the model
    sfs.fit(X, y)
    
    # Store the mean cross-validated score
    mean_score = np.mean(cross_val_score(lr, X.iloc[:, list(sfs.k_feature_idx_)], y, cv=5, scoring='accuracy'))
    results[k] = mean_score
    selected_features[k] = X.columns[list(sfs.k_feature_idx_)].tolist()
    print(f'Number of features: {k}, Cross-Validated Accuracy: {mean_score}')

# Find the number of features with the best mean score
best_feature_count = max(results, key=results.get)
best_score = results[best_feature_count]
best_features = selected_features[best_feature_count]

print(f'Best number of features: {best_feature_count}, with Accuracy score: {best_score}')
print(f'Selected features for the best result: {best_features}')

Number of features: 4, Cross-Validated Accuracy: 0.847867719810872
Number of features: 5, Cross-Validated Accuracy: 0.8482349028045016
Number of features: 6, Cross-Validated Accuracy: 0.8480495219168797
Best number of features: 5, with Accuracy score: 0.8482349028045016
Selected features for the best result: ['Perimeter', 'MajorAxisLength', 'EquivDiameter', 'ShapeFactor1', 'ShapeFactor2']


In [8]:
# Step 7. Select the columns printed from the previous step as the features, and create a vector assembler
inputCols = best_features

assembler = VectorAssembler(inputCols=inputCols, outputCol="features")

In [9]:
# Step 8. Scale the features using Standard Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [10]:
# Step 9. Create the Spark Logistic Regression model
classifier = SparkLogisticRegression(featuresCol="scaledFeatures", labelCol="label")

In [11]:
# Step 10. Build the pipeline with the correct order
pipeline = Pipeline(stages=[assembler, scaler, classifier])

In [12]:
# Step 11. Split the transformed data into training and test sets
(training_data, testing_data) = sdf.randomSplit([0.7, 0.3], seed=42)

In [13]:
# Step 12. Fit the model using the training data
model = pipeline.fit(training_data)

# Ignore the warning messages (if any)

24/05/15 18:00:47 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/15 18:00:47 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [14]:
# Step 13. Evaluate the model using the testing data
predictions = model.transform(testing_data)

In [15]:
# Step 14. Select and display the predictions alongside the actual values for the target column
predictions.select("features", "label", "prediction").show(10)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[524.736,183.9652...|  0.0|       0.0|
|[533.701,185.3819...|  0.0|       0.0|
|[530.825,191.9944...|  0.0|       0.0|
|[535.436,192.5302...|  0.0|       0.0|
|[538.454,196.5372...|  0.0|       0.0|
|[558.343,208.5232...|  0.0|       0.0|
|[545.616,191.6489...|  0.0|       0.0|
|[543.295,201.3477...|  0.0|       0.0|
|[551.696,204.7763...|  0.0|       0.0|
|[557.585,199.1229...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows



In [16]:
# Step 15. Evaluate the model performance further: Accuracy, Precision, Recall, and F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

Accuracy = 0.907467940658788
Precision = 0.907893570993255
Recall = 0.907467940658788
F1 score =  0.9075041112512361


In [17]:
# Step 16. Stop the SparkSession
spark.stop()