## Predictive Analytics for Enhancing Machine Maintenance Optimization

Team Members : 
1. Penchala Akshay Kumar Kandagaddala
2. Yogesh Savirigana
3. Venkata Satya Naveen Nagulapalli
4. Shashank Rao Gujja
5. Sudeshna Mullaguru
6. Vijaya Lakshmi Kalpana Potti

### Business Challenge:

In the modern manufacturing industry, minimizing unplanned downtime caused by equipment failures is of most important. Unplanned downtime can lead to substantial financial losses, decreased productivity, and disrupted operations. To tackle this challenge, manufacturers are embracing predictive maintenance (PdM) solutions. These solutions harness the power of data analytics to predict and prevent potential machine malfunctions before they disrupt production. By analyzing historical data, monitoring equipment in real-time, and utilizing machine learning algorithms, manufacturers can identify patterns and anomalies that signal impending issues. This proactive approach not only saves time and resources but also optimizes efficiency and overall profitability in the manufacturing process.

### Problem Statement:

In the highly competitive manufacturing sector, maximizing equipment uptime is a critical factor in maintaining productivity and profitability. Traditional preventive maintenance practices, though well-intentioned, often fall short of ensuring peak equipment performance and minimizing costly downtime. These methods rely on time-based schedules or usage-based thresholds, leading to unnecessary maintenance interventions, increased expenses, and production disruptions.

Predictive maintenance (PdM) is a revolutionary approach to equipment maintenance, shifting the focus from reactive repairs to proactive prevention. By harnessing the potential of data analytics and machine learning, PdM empowers manufacturers to anticipate and forestall machine failures, substantially reducing unplanned downtime and its associated financial burdens.

At the heart of PdM is its capability to accurately predict the likelihood of machine failures and identify potential failure modes. This predictive ability hinges on continuous data collection and analysis from various sensors and monitoring devices installed on critical equipment. These data streams encompass variables such as vibration, temperature, pressure, and other parameters that offer insights into machinery health and condition.

Once a machine failure is anticipated, PdM systems provide comprehensive insights into the most probable type of failure and an estimate of the time remaining until the failure occurs (ETTF). Armed with this vital information, maintenance teams can proactively schedule maintenance activities, addressing potential issues before they disrupt production operations.

### Data Source:
The data is sourced from the UCI Machine Learning Repository, which can be found in : https://archive.ics.uci.edu/dataset/601/ai4i+2020+predictive+maintenance+dataset

### Dataset Overview:

The Machine Predictive Maintenance Classification Dataset is a synthetic dataset that reflects real predictive maintenance data encountered in industry. It consists of 10,000 data points with 14 features and two primary targets: "Failure or Not" and "Failure Type." The dataset is designed to be challenging for machine learning algorithms, as it contains a variety of different failure modes and noise.

14 features

Two primary targets:

"Failure or Not": Indicates whether a machine failure has occurred

## Importing Data

In [1]:
from pyspark.sql import SparkSession;

# warehouse_location points to the default location for managed databases and tables
from os.path import abspath
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("ISM6562 PySpark Tutorials") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()


# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

23/11/10 03:06:50 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.21.11.132 instead (on interface eth0)
23/11/10 03:06:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/10 03:06:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [2]:
spark

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

df = spark.read.csv('data/ai4i2020.csv', header=True, inferSchema=True)

# display the first 5 rows of the dataframe
df.show(5)

                                                                                

+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|UDI|Product ID|Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|Torque [Nm]|Tool wear [min]|Machine failure|TWF|HDF|PWF|OSF|RNF|
+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|  1|    M14860|   M|              298.1|                  308.6|                  1551|       42.8|              0|              0|  0|  0|  0|  0|  0|
|  2|    L47181|   L|              298.2|                  308.7|                  1408|       46.3|              3|              0|  0|  0|  0|  0|  0|
|  3|    L47182|   L|              298.1|                  308.5|                  1498|       49.4|              5|              0|  0|  0|  0|  0|  0|
|  4|    L47183|   L|              298.2|                  308.6|                 

### Inspect the data

In [4]:
df.printSchema()
df.show()

root
 |-- UDI: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air temperature [K]: double (nullable = true)
 |-- Process temperature [K]: double (nullable = true)
 |-- Rotational speed [rpm]: integer (nullable = true)
 |-- Torque [Nm]: double (nullable = true)
 |-- Tool wear [min]: integer (nullable = true)
 |-- Machine failure: integer (nullable = true)
 |-- TWF: integer (nullable = true)
 |-- HDF: integer (nullable = true)
 |-- PWF: integer (nullable = true)
 |-- OSF: integer (nullable = true)
 |-- RNF: integer (nullable = true)

+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|UDI|Product ID|Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|Torque [Nm]|Tool wear [min]|Machine failure|TWF|HDF|PWF|OSF|RNF|
+---+----------+----+-------------------+-----------------------+----------------------+------

#### Column Descriptions

| Column Name               | Description |
|---------------------------|-------------|
| `UDI`                     | Unique identifier for each record. |
| `Product ID`              | Identifier for the product. |
| `Type`                    | The type of the product. |
| `Air temperature [K]`     | Ambient air temperature measured in Kelvin. |
| `Process temperature [K]` | Temperature of the process, measured in Kelvin. |
| `Rotational speed [rpm]`  | Speed at which the machine operates, measured in revolutions per minute. |
| `Torque [Nm]`             | The torque produced by the machine, measured in Newton meters. |
| `Tool wear [min]`         | The amount of wear on the tool, measured in minutes. |
| `Machine failure`         | Indicates if there was a machine failure (1) or not (0). |
| `TWF`                     | Tool Wear Failure - Indicates if the failure was due to tool wear. |
| `HDF`                     | Heat Dissipation Failure - Indicates if the failure was due to heat dissipation. |
| `PWF`                     | Power Failure - Indicates if the failure was due to power failure. |
| `OSF`                     | Overstrain Failure - Indicates if the failure was due to overstrain. |
| `RNF`                     | Random Failures - Indicates if the failure was random. |


| Column Name               | Description |
|---------------------------|-------------|
| `UDI`                     | Unique identifier for each record. |
| `Product ID`              | Identifier for the product. |
| `Type`                    | The type of the product. |
| `Air temperature [K]`     | Ambient air temperature measured in Kelvin. |
| `Process temperature [K]` | Temperature of the process, measured in Kelvin. |
| `Rotational speed [rpm]`  | Speed at which the machine operates, measured in revolutions per minute. |
| `Torque [Nm]`             | The torque produced by the machine, measured in Newton meters. |
| `Tool wear [min]`         | The amount of wear on the tool, measured in minutes. |
| `Machine failure`         | Indicates if there was a machine failure (1) or not (0). |
| `TWF`                     | Tool Wear Failure - Indicates if the failure was due to tool wear. |
| `HDF`                     | Heat Dissipation Failure - Indicates if the failure was due to heat dissipation. |
| `PWF`                     | Power Failure - Indicates if the failure was due to power failure. |
| `OSF`                     | Overstrain Failure - Indicates if the failure was due to overstrain. |
| `RNF`                     | Random Failures - Indicates if the failure was random. |


## Data cleaning and preprocessing

In [5]:
# Rename columns to replace spaces with underscores
for col in df.columns:
    df = df.withColumnRenamed(col, col.replace(" ", "_"))

In [6]:
from pyspark.sql.functions import col
 
# Create a new DataFrame with lowercase column names
df = df.select([col(column).alias(column.lower()) for column in df.columns])

In [7]:
from pyspark.sql.functions import col, when, count

# Check for missing values in each column
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show the missing values for each column
missing_values.show()


+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|udi|product_id|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|  0|         0|   0|                  0|                      0|                     0|          0|              0|              0|  0|  0|  0|  0|  0|
+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+



In [8]:
# Since 'Product ID' is a unique identifier, we will drop it for modeling purposes
df = df.drop('UDI', 'Product_ID')
df.show()

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|   M|              298.1|                  308.6|                  1551|       42.8|              0|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.7|                  1408|       46.3|              3|              0|  0|  0|  0|  0|  0|
|   L|              298.1|                  308.5|                  1498|       49.4|              5|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.6|                  1433|       39.5|              7|              0|  0|  0|  0|  0|  0|
|   L|              298.2|        

# Data Exploration using SQL

In [9]:
#Lists all the tables that are present in  the environment
tables = spark.sql("show tables").show()

23/11/10 03:07:00 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/10 03:07:00 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/11/10 03:07:03 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/11/10 03:07:03 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore student@127.0.0.1


+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|machine_failure_t...|      false|
|  default|        movieratings|      false|
|  default|              movies|      false|
+---------+--------------------+-----------+



23/11/10 03:07:03 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [10]:
spark.catalog.listTables()

[Table(name='machine_failure_table', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movieratings', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movies', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

In [11]:
# Drop table if exist and create new table named Machine_Failure_Table
spark.sql('drop table if exists Machine_Failure_Table')
df.write.saveAsTable('Machine_Failure_Table')

23/11/10 03:07:06 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/11/10 03:07:06 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/11/10 03:07:06 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/10 03:07:06 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [12]:
# THe table is not a temporary table and hence it is showing as false in the below output
tables = spark.sql("show tables").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|machine_failure_t...|      false|
|  default|        movieratings|      false|
|  default|              movies|      false|
+---------+--------------------+-----------+



In [13]:
#select all the fields in the table
query1 = spark.sql("SELECT * FROM Machine_Failure_Table;")
query1.show()

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|   M|              298.1|                  308.6|                  1551|       42.8|              0|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.7|                  1408|       46.3|              3|              0|  0|  0|  0|  0|  0|
|   L|              298.1|                  308.5|                  1498|       49.4|              5|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.6|                  1433|       39.5|              7|              0|  0|  0|  0|  0|  0|
|   L|              298.2|        

In [14]:
#Select count of all fields
query2 = spark.sql("SELECT COUNT(*) FROM Machine_Failure_Table;") # note that this will generate an error
query2.show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [15]:
#Find the rows where the machine failed and the torque was greater than 50 Nm
query3 = spark.sql("""
SELECT * FROM Machine_Failure_Table
WHERE Machine_failure = 0 AND `Torque_[Nm]` > 50;
""")
query3.show()

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|   M|              298.6|                  309.1|                  1339|       51.1|             34|              0|  0|  0|  0|  0|  0|
|   H|              298.8|                  309.2|                  1306|       54.5|             50|              0|  0|  0|  0|  0|  0|
|   H|              298.9|                  309.2|                  1379|       50.7|            106|              0|  0|  0|  0|  0|  0|
|   L|              298.8|                  309.1|                  1350|       52.5|            111|              0|  0|  0|  0|  0|  0|
|   L|              298.8|        

In [16]:
#average air temperature for each type
query4 = spark.sql("""
SELECT Type, AVG(`Air_temperature_[K]`) AS avg_air_temperature
FROM Machine_Failure_Table
GROUP BY Type
""")

query4.show()

+----+-------------------+
|Type|avg_air_temperature|
+----+-------------------+
|   M| 300.02926259592965|
|   L|  300.0158333333335|
|   H|  299.8669990029907|
+----+-------------------+



In [17]:
# Select all fields where Process temperature is between 300 and 400 Kelvin and the rotational speed is greater than 1000 rpm
query5 = spark.sql("""
SELECT * FROM Machine_Failure_Table
WHERE `Process_temperature_[K]` BETWEEN 300 AND 400
AND `Rotational_speed_[rpm]` > 1000;
""")

query5.show()

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|   M|              298.1|                  308.6|                  1551|       42.8|              0|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.7|                  1408|       46.3|              3|              0|  0|  0|  0|  0|  0|
|   L|              298.1|                  308.5|                  1498|       49.4|              5|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.6|                  1433|       39.5|              7|              0|  0|  0|  0|  0|  0|
|   L|              298.2|        

In [18]:
#Average tool wear for each type of product
query6 = spark.sql("""
SELECT Type, AVG(`Tool_wear_[min]`) AS avg_tool_wear
FROM Machine_Failure_Table
GROUP BY Type
ORDER BY avg_tool_wear DESC;
""")

query6.show()

+----+------------------+
|Type|     avg_tool_wear|
+----+------------------+
|   L|108.37883333333333|
|   H|  107.419740777667|
|   M|107.27227227227228|
+----+------------------+



In [19]:
# Calculating average rotational speed and torque for failed and non-failed machines
query7=spark.sql("""
SELECT 
    Machine_failure, 
    AVG(`rotational_speed_[rpm]`) as avg_rotational_speed, 
    AVG(`torque_[Nm]`) as avg_torque
FROM Machine_Failure_Table
GROUP BY Machine_failure
""")
query7.show()

+---------------+--------------------+------------------+
|Machine_failure|avg_rotational_speed|        avg_torque|
+---------------+--------------------+------------------+
|              1|   1496.486725663717|50.168141592920364|
|              0|  1540.2600144912535| 39.62965531518476|
+---------------+--------------------+------------------+



In [20]:
#Maximum rotational speed and torque values for records where a machine failure occurred:

query8 = spark.sql(""" SELECT Type, AVG(`air_temperature_[K]`) AS avg_air_temperature, AVG(`process_temperature_[K]`) AS avg_process_temperature
FROM Machine_Failure_Table
GROUP BY Type;""")

query8.show()


+----+-------------------+-----------------------+
|Type|avg_air_temperature|avg_process_temperature|
+----+-------------------+-----------------------+
|   M| 300.02926259592965|      310.0187854521179|
|   L|  300.0158333333335|      310.0123000000004|
|   H|  299.8669990029907|      309.9257228315058|
+----+-------------------+-----------------------+



In [21]:
df.show()

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|twf|hdf|pwf|osf|rnf|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|   M|              298.1|                  308.6|                  1551|       42.8|              0|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.7|                  1408|       46.3|              3|              0|  0|  0|  0|  0|  0|
|   L|              298.1|                  308.5|                  1498|       49.4|              5|              0|  0|  0|  0|  0|  0|
|   L|              298.2|                  308.6|                  1433|       39.5|              7|              0|  0|  0|  0|  0|  0|
|   L|              298.2|        

# Train_Test_Split

#### We are only considering the columns type, air_temperature, process_temperature_, rotational_speed_, torque_, tool_wear_ and machine_failure for the train_test split. The columns TWF, HDF, PWF, OSF, RNF indicate specific types of failures. Including these as features would directly give away the answer to whether a machine failure occurred, which is what you're trying to predict.

In [22]:
input = ['type','air_temperature_[k]','process_temperature_[k]','rotational_speed_[rpm]','torque_[nm]','tool_wear_[min]','machine_failure']

In [23]:
train_data,test_data=df.randomSplit([0.7,0.3])
train_data = train_data.select(input)
test_data = test_data.select(input)
train_data.show(5)

+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+
|type|air_temperature_[k]|process_temperature_[k]|rotational_speed_[rpm]|torque_[nm]|tool_wear_[min]|machine_failure|
+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+
|   H|              295.5|                  305.9|                  1593|       37.2|            197|              0|
|   H|              295.6|                  306.0|                  1396|       52.4|              0|              0|
|   H|              295.6|                  306.1|                  1256|       62.3|            142|              0|
|   H|              295.6|                  306.1|                  1623|       30.9|            210|              0|
|   H|              295.6|                  306.2|                  1329|       55.3|            135|              0|
+----+-------------------+-----------------------+------

# Feature Engineering

### Data Imbalance issue

In [24]:
machine_failure_count = train_data.groupBy('machine_failure').count()

# Show the result
machine_failure_count.show()

+---------------+-----+
|machine_failure|count|
+---------------+-----+
|              1|  227|
|              0| 6713|
+---------------+-----+



In [25]:
from pyspark.sql.functions import col, explode, array, lit, rand
# Separate majority and minority classes
major_df = train_data.filter(col('machine_failure') == 0)
minor_df = test_data.filter(col('machine_failure') == 1)

In [26]:
# Calculate the oversampling ratio
ratio = major_df.count() // minor_df.count()
print(ratio)

59


#### 65 clearly indicates that the data is imbalanced and need to be sampled to make it balanced

In [27]:
# Oversample the minority class
# Create an array column with 'ratio' copies of the minority rows
oversampled_minor_df = minor_df.withColumn('oversample', explode(array([lit(x) for x in range(ratio)]))).drop('oversample')

# Combine the oversampled minority rows with the majority class
balanced_df = major_df.unionAll(oversampled_minor_df)

# Shuffle the DataFrame (optional but recommended)
balanced_df = balanced_df.orderBy(rand())

In [28]:
train_data = balanced_df

### After performing oversampling, the data seemed to be sampled

In [29]:
machine_failure_count = train_data.groupBy('machine_failure').count()

# Show the result
machine_failure_count.show()

+---------------+-----+
|machine_failure|count|
+---------------+-----+
|              0| 6713|
|              1| 6608|
+---------------+-----+



### Converting the type variable to an index and encoding it to a vector to pass it as input to the model

In [30]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
 
type_indexer = StringIndexer(inputCol="type", outputCol="TypeIndex")
type_encoder = OneHotEncoder(inputCols=["TypeIndex"], outputCols=["TypeVec"])

In [31]:
df = df.drop('type')

In [32]:
#Create Feature Vectors:

from pyspark.ml.feature import VectorAssembler

# Feature engineering
assembler = VectorAssembler(inputCols=["TypeVec", "air_temperature_[k]", "process_temperature_[k]", 
                                       "rotational_speed_[rpm]", "torque_[nm]", 
                                       "tool_wear_[min]"], outputCol="features")

In [33]:
#Scale the Continuous Features:

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

## Identifying the evaluation metric

#### In the context of predictive maintenance, where the dataset involves machine operations and potential failures, the primary concern is the cost associated with missing an actual failure (false negative). Not identifying a machine failure in time can lead to unplanned downtime, increased repair costs, safety risks, and significant operational disruptions. Therefore, our objective is to minimize these false negatives as much as possible. This makes recall a crucial metric for our predictive model.

## Building the Decision Tree Classifier

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

dt_model = DecisionTreeClassifier(labelCol='machine_failure',maxBins=5000)

In [35]:
pipe = Pipeline(
    stages=[
        type_indexer,
        type_encoder,
        assembler,
        scaler,
        dt_model
    ]
)

In [36]:
fit_model=pipe.fit(train_data)

In [37]:
results = fit_model.transform(test_data)

In [38]:
results.select(['machine_failure','prediction']).show()

+---------------+----------+
|machine_failure|prediction|
+---------------+----------+
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              1|       1.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
+---------------+----------+
only showing top 20 rows



### Evaluating the model

In [39]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderROC')
AUC = AUC_evaluator.evaluate(results)

print(f"The area under the curve is {AUC:.2f}")

The area under the curve is 0.95


###  Area Under PR Curve

In [40]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results)

print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.3542407834844464


In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="machine_failure", predictionCol="prediction")
recall = evaluator.setMetricName("recallByLabel").setMetricLabel(1).evaluate(results)
print(f"The recall of the model is {recall}")

The recall of the model is 0.9732142857142857


In [42]:
from sklearn.metrics import confusion_matrix

y_true = results.select("machine_failure")
y_true = y_true.toPandas()
 
y_pred = results.select("prediction")
y_pred = y_pred.toPandas()
 
cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix: \n {}".format(cnf_matrix))

Below is the confusion matrix: 
 [[2753  195]
 [   3  109]]


In [43]:
tn = cnf_matrix[0][0]
fp = cnf_matrix[0][1]
fn = cnf_matrix[1][0]
tp = cnf_matrix[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

In [44]:
print(f"Accuracy: {accuracy:.8f}")
print(f"Precision: {precision:.8f}")
print(f"Recall: {recall:.8f}")
print(f"F1 Score: {f1_score:.8f}")

Accuracy: 0.93529412
Precision: 0.35855263
Recall: 0.97321429
F1 Score: 0.52403846


## RandomForest Classifier

In [45]:
from pyspark.ml.classification import RandomForestClassifier

# Creating an object for RandomForest classfication model
rf_model = RandomForestClassifier(labelCol="machine_failure", featuresCol="features", numTrees=10)

In [46]:
from pyspark.ml import Pipeline

pipe_rf = Pipeline(
    stages=[
        type_indexer,
        type_encoder,
        assembler,
        scaler,
        rf_model
    ]
)

In [47]:
# Train the model
rf_model = pipe_rf.fit(train_data)

In [48]:
results_rf = rf_model.transform(test_data)

In [49]:
results_rf.select(['machine_failure','prediction']).show()

+---------------+----------+
|machine_failure|prediction|
+---------------+----------+
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              1|       1.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
+---------------+----------+
only showing top 20 rows



## Evaluating the Model

In [50]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderROC')
AUC = AUC_evaluator.evaluate(results_rf)

print(f"The area under the curve is {AUC:.2f}")

The area under the curve is 0.94


## Area Under PR Curve

In [51]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results_rf)

print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.2881100407642732


In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="machine_failure", predictionCol="prediction")
recall = evaluator.setMetricName("recallByLabel").setMetricLabel(1).evaluate(results_rf)
print(f"The recall of the model is {recall}")

The recall of the model is 0.9642857142857143


In [53]:
from sklearn.metrics import confusion_matrix

y_true = results_rf.select("machine_failure")
y_true = y_true.toPandas()
 
y_pred = results_rf.select("prediction")
y_pred = y_pred.toPandas()
 
cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix: \n {}".format(cnf_matrix))

Below is the confusion matrix: 
 [[2687  261]
 [   4  108]]


In [54]:
tn = cnf_matrix[0][0]
fp = cnf_matrix[0][1]
fn = cnf_matrix[1][0]
tp = cnf_matrix[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

In [55]:
print(f"Accuracy: {accuracy:.8f}")
print(f"Precision: {precision:.8f}")
print(f"Recall: {recall:.8f}")
print(f"F1 Score: {f1_score:.8f}")

Accuracy: 0.91339869
Precision: 0.29268293
Recall: 0.96428571
F1 Score: 0.44906445


## Linear SVC Model

In [56]:
from pyspark.ml.classification import LinearSVC

lsvc_model = LinearSVC(labelCol="machine_failure", featuresCol="features", maxIter=10, regParam=0.1)

In [57]:
pipe_svc = Pipeline(
    stages=[
        type_indexer,
        type_encoder,
        assembler,
        scaler,
        lsvc_model
    ]
)

In [58]:
# Train the model
lsvc_model = pipe_svc.fit(train_data)

In [59]:
results_svc = lsvc_model.transform(test_data)

In [60]:
results_svc.select(['machine_failure','prediction']).show()

+---------------+----------+
|machine_failure|prediction|
+---------------+----------+
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              1|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
+---------------+----------+
only showing top 20 rows



## Evaluating the Model

In [61]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderROC')
AUC = AUC_evaluator.evaluate(results_svc)

print(f"The area under the curve is {AUC:.2f}")

The area under the curve is 0.79


## Area Under PR Curve

In [62]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results_svc)

print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.1264635704228101


In [63]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="machine_failure", predictionCol="prediction")
recall = evaluator.setMetricName("recallByLabel").setMetricLabel(1).evaluate(results_svc)
print(f"The recall of the model is {recall}")

The recall of the model is 0.7678571428571429


In [64]:
from sklearn.metrics import confusion_matrix

y_true = results_svc.select("machine_failure")
y_true = y_true.toPandas()
 
y_pred = results_svc.select("prediction")
y_pred = y_pred.toPandas()
 
cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix: \n {}".format(cnf_matrix))

Below is the confusion matrix: 
 [[2412  536]
 [  26   86]]


In [65]:
tn = cnf_matrix[0][0]
fp = cnf_matrix[0][1]
fn = cnf_matrix[1][0]
tp = cnf_matrix[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

In [66]:
print(f"Accuracy: {accuracy:.6f}")
print(f"Precision: {precision:.6f}")
print(f"Recall: {recall:.6f}")
print(f"F1 Score: {f1_score:.6f}")

Accuracy: 0.816340
Precision: 0.138264
Recall: 0.767857
F1 Score: 0.234332


## Logistic Regression

In [67]:
from pyspark.ml.classification import LogisticRegression
lr_model = LogisticRegression(featuresCol="features", labelCol="machine_failure")

In [68]:
pipe_lr = Pipeline(
    stages=[
        type_indexer,
        type_encoder,
        assembler,
        #scaler,
        lr_model
    ]
)

In [69]:
# Train the model
lr_model = pipe_lr.fit(train_data)

In [70]:
train_data.head()

Row(type='L', air_temperature_[k]=300.0, process_temperature_[k]=309.0, rotational_speed_[rpm]=1709, torque_[nm]=32.1, tool_wear_[min]=79, machine_failure=0)

In [71]:
train_data.dtypes

[('type', 'string'),
 ('air_temperature_[k]', 'double'),
 ('process_temperature_[k]', 'double'),
 ('rotational_speed_[rpm]', 'int'),
 ('torque_[nm]', 'double'),
 ('tool_wear_[min]', 'int'),
 ('machine_failure', 'int')]

In [72]:
results_lr = lr_model.transform(test_data)

In [73]:
results_lr.select(['machine_failure','prediction']).show()

+---------------+----------+
|machine_failure|prediction|
+---------------+----------+
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       1.0|
|              1|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
|              0|       0.0|
+---------------+----------+
only showing top 20 rows



## Evaluating the Model

In [74]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderROC')
AUC = AUC_evaluator.evaluate(results_lr)

print(f"The area under the curve is {AUC:.2f}")

The area under the curve is 0.83


## Area Under PR Curve

In [75]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='machine_failure',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(results_lr)

print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.14614929305055355


In [76]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="machine_failure", predictionCol="prediction")
recall = evaluator.setMetricName("recallByLabel").setMetricLabel(1).evaluate(results_lr)
print(f"The recall of the model is {recall}")


The recall of the model is 0.8303571428571429


In [77]:
from sklearn.metrics import confusion_matrix

y_true = results_lr.select("machine_failure")
y_true = y_true.toPandas()
 
y_pred = results_lr.select("prediction")
y_pred = y_pred.toPandas()
 
cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix: \n {}".format(cnf_matrix))

Below is the confusion matrix: 
 [[2446  502]
 [  19   93]]


In [78]:
from sklearn.metrics import confusion_matrix, recall_score

y_true = results_lr.select("machine_failure").toPandas()
y_pred = results_lr.select("prediction").toPandas()

# Calculate confusion matrix
cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix: \n {}".format(cnf_matrix))

# Calculate recall
recall = recall_score(y_true, y_pred)
print("Recall: {}".format(recall))


Below is the confusion matrix: 
 [[2446  502]
 [  19   93]]
Recall: 0.8303571428571429


In [79]:
tn = cnf_matrix[0][0]
fp = cnf_matrix[0][1]
fn = cnf_matrix[1][0]
tp = cnf_matrix[1][1]

accuracy = (tp+tn)/(tp+tn+fp+fn)
precision = tp/(tp+fp)
recall = tp/(tp+fn)
f1_score = 2*(precision*recall)/(precision+recall)

In [80]:
print(f"Accuracy: {accuracy:.8f}")
print(f"Precision: {precision:.8f}")
print(f"Recall: {recall:.8f}")
print(f"F1 Score: {f1_score:.8f}")

Accuracy: 0.82973856
Precision: 0.15630252
Recall: 0.83035714
F1 Score: 0.26308345


## Analysis

Decision Tree: Exhibits excellent performance in terms of recall (0.97), making it highly effective at identifying actual failures, a critical aspect in predictive maintenance. However, its precision is relatively low (0.35), indicating a higher rate of false positives. The AUC of 0.95 is impressive, showing strong discriminative ability. The PR score, though lower (0.35), is balanced by its exceptional recall, making this model highly reliable for detecting failures.

Random Forest: 
Shows a slightly lower recall (0.96) compared to the Decision Tree, but still excels in identifying actual failures. Its precision is marginally lower than the Decision Tree, suggesting a similar rate of false positives. With an AUC of 0.94, it's nearly as effective as the Decision Tree in distinguishing between classes. The lower PR score (0.28) is a trade-off for its high recal

Linear SVC: 
Presents a significant drop in recall (0.76) compared to the tree-based models, indicating it misses more actual failures. Its precision is also low (0.13), suggesting a higher rate of false positives. The AUC of 0.79 is decent but not outstanding, reflecting moderate discriminative ability. With the lowest PR score (0.12), this model is less effective in this predictive maintenance scenario, where high recall is crucia

Logistic Regression: 
Offers a moderate recall of 0.83, better than Linear SVC but not as high as the tree-based models. Its precision is slightly better (0.15), but still on the lower side. An AUC of 0.83 indicates reasonable discriminative ability, while the PR score (0.14) suggests challenges in precision-recall balance. This model provides a middle ground in terms of performance, being more balanced but not excelling in recall as much as the Decision Tree or Random Fores

Considering the critical importance of recall in predictive maintenance, the Decision Tree model emerges as the most suitable choice due to its highest recall. It ensures that almost all actual failures are detected, despite its lower precision which can lead to some false positives. The Random Forest model closely follows, offering a similar recall with a slightly better balance in other metrics. The Linear SVC and Logistic Regression, while more balanced in terms of precision and recall, do not meet the high recall requirementt.l.e.

## Conclusion

In addressing the challenge of minimizing unplanned downtime in manufacturing through predictive maintenance, Decision Tree model is the most effective model with a remarkable 97% recall. This signifies its superior ability to predict and prevent potential machine failures, aligning well with the business objective of optimizing efficiency and minimizing disruptions to production. The DEcision Tree's model deployment is recommended for real-time predictive maintenance, offering a proactive approach to address the critical issue of equipment failures.