# Decision Trees

In [1]:
from pyspark.sql import SparkSession

# Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Multiclass Failure Prediction") \
    .getOrCreate()

# Load the dataset
file_path = "C:/Users/Zahra/Desktop/ProjectBigDataAnalytics/data/predictive_maintenance_data_all_columns.csv"  
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Display schema and first few rows
data.printSchema()
data.show(5)

# Count rows and columns
print(f"Total rows: {data.count()}")
print(f"Total columns: {len(data.columns)}")

root
 |-- UDI: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air temperature [K]: double (nullable = true)
 |-- Process temperature [K]: double (nullable = true)
 |-- Rotational speed [rpm]: integer (nullable = true)
 |-- Torque [Nm]: double (nullable = true)
 |-- Tool wear [min]: integer (nullable = true)
 |-- Target: integer (nullable = true)
 |-- Failure Type: string (nullable = true)
 |-- failure_type_id: double (nullable = true)
 |-- type_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- date: date (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- day_of_week: string (nullable = true)

+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+------+------------+---------------+-------+-

In [2]:
from pyspark.sql.functions import col

# Step 2.1: Drop rows with missing values (or use .fillna() for filling missing data)
data_cleaned = data.na.drop()

# Step 2.2: Select essential features
# Exclude irrelevant features like 'UDI', 'Manufacturer', 'Timestamp', etc.
selected_features = ['Type', 'Air temperature [K]', 'Process temperature [K]', 
                     'Rotational speed [rpm]', 'Torque [Nm]', 
                     'Tool wear [min]', 'Failure Type']
data_selected = data_cleaned.select(selected_features)

# Step 2.3: Convert 'Failure Type' to numerical values for classification
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Failure Type", outputCol="Failure_Type_Index")
data_prepared = indexer.fit(data_selected).transform(data_selected)

# Show prepared data
data_prepared.show(5)

+----+-------------------+-----------------------+----------------------+-----------+---------------+------------+------------------+
|Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|Torque [Nm]|Tool wear [min]|Failure Type|Failure_Type_Index|
+----+-------------------+-----------------------+----------------------+-----------+---------------+------------+------------------+
|   M|              298.1|                  308.6|                  1551|       42.8|              0|  No Failure|               0.0|
|   L|              298.2|                  308.7|                  1408|       46.3|              3|  No Failure|               0.0|
|   L|              298.1|                  308.5|                  1498|       49.4|              5|  No Failure|               0.0|
|   L|              298.2|                  308.6|                  1433|       39.5|              7|  No Failure|               0.0|
|   L|              298.2|                  308.7|            

In [4]:
print(f"Total rows: {data.count()}")
print(f"Total columns: {len(data.columns)}")

Total rows: 10000
Total columns: 19


In [5]:
# Step 3: Split the data into training and testing sets
train_data, test_data = data_prepared.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train_data.count()} rows")
print(f"Test set: {test_data.count()} rows")

Training set: 8079 rows
Test set: 1921 rows


In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 4.1: Assemble features into a vector
feature_columns = ['Air temperature [K]', 'Process temperature [K]', 
                   'Rotational speed [rpm]', 'Torque [Nm]', 'Tool wear [min]']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Step 4.2: Train Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="Failure_Type_Index", featuresCol="features")
model = dt.fit(train_data)

# Step 4.3: Make predictions
predictions = model.transform(test_data)

# Display predictions
predictions.select("features", "Failure_Type_Index", "prediction").show(100)

IllegalArgumentException: Output column features already exists.

In [8]:
predictions.select("features", "Failure_Type_Index", "prediction").show(100)

+--------------------+------------------+----------+
|            features|Failure_Type_Index|prediction|
+--------------------+------------------+----------+
|[295.6,306.1,1256...|               0.0|       0.0|
|[295.6,306.2,1632...|               0.0|       0.0|
|[295.7,306.2,1458...|               0.0|       0.0|
|[296.2,307.0,1542...|               0.0|       0.0|
|[296.4,307.5,1403...|               0.0|       0.0|
|[296.6,307.4,1333...|               0.0|       0.0|
|[296.7,307.9,1424...|               0.0|       0.0|
|[296.8,308.0,1453...|               0.0|       0.0|
|[296.9,308.0,1416...|               0.0|       0.0|
|[296.9,308.3,1474...|               0.0|       0.0|
|[297.0,307.7,1834...|               0.0|       0.0|
|[297.0,307.9,1448...|               0.0|       0.0|
|[297.0,308.1,1410...|               0.0|       0.0|
|[297.1,307.9,1365...|               0.0|       0.0|
|[297.1,308.4,1654...|               0.0|       0.0|
|[297.2,308.5,1524...|               0.0|     

In [12]:
# Step 5: Evaluate the model
evaluator = MulticlassClassificationEvaluator(
    labelCol="Failure_Type_Index", 
    predictionCol="prediction", 
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Optional: Compute precision and recall
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
print(f"Precision: {precision}")
print(f"Recall: {recall}")


Accuracy: 0.9656428943258719
Precision: 0.9551314047935394
Recall: 0.9656428943258719


In [13]:
# Generate a confusion matrix
confusion_matrix = predictions.select("Failure_Type_Index", "prediction") \
                              .groupBy("Failure_Type_Index", "prediction") \
                              .count() \
                              .orderBy("Failure_Type_Index", "prediction")

confusion_matrix.show()


+------------------+----------+-----+
|Failure_Type_Index|prediction|count|
+------------------+----------+-----+
|               0.0|       0.0| 1830|
|               0.0|       1.0|    8|
|               0.0|       3.0|   10|
|               1.0|       0.0|   11|
|               1.0|       1.0|   13|
|               1.0|       2.0|    2|
|               2.0|       0.0|   17|
|               2.0|       2.0|    1|
|               2.0|       3.0|    1|
|               3.0|       0.0|    5|
|               3.0|       1.0|    1|
|               3.0|       3.0|   11|
|               4.0|       0.0|    6|
|               5.0|       0.0|    5|
+------------------+----------+-----+



In [14]:
# F1 Score
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print(f"F1 Score: {f1_score}")

# Precision
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
print(f"Precision: {precision}")

# Recall
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
print(f"Recall: {recall}")


F1 Score: 0.9591967204019548
Precision: 0.9551314047935394
Recall: 0.9656428943258719


Py4JJavaError: An error occurred while calling o1025.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 103.0 failed 1 times, most recent failure: Lost task 6.0 in stage 103.0 (TID 98) (10.10.10.5 executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1170)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1089)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:108)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:500)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:159)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1126)
	... 42 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1170)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1089)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:108)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:500)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:159)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1126)
	... 42 more
