# Machine failure prediction

## Introduction

In the field of industrial maintenance and operations, the timely detection of machine failures is crucial to prevent unexpected downtime, minimize production losses, and optimize maintenance strategies. Machine learning techniques have emerged as valuable tools for predicting and classifying data. However, the effectiveness of such models heavily relies on the quality and balance of the dataset used for training.

Throughout this project, I will explore different techniques for addressing class imbalance, including oversampling and undersampling methods, such as Synthetic Minority Over-sampling Technique (SMOTE) and Random Under-Sampling (RUS). Additionally, I will investigate the impact of various feature engineering strategies, such as dimensionality reduction and feature selection, to improve the model's ability to discriminate between healthy and failing machines.

The performance of the developed binary classification models will be evaluated using accuracy, consufion matrix and area under the receiver operating characteristic curve (AUC-ROC). The results will be compared with a baseline model trained on the original unbalanced dataset, highlighting the effectiveness of the proposed techniques in improving the model's performance on the minority class (failure machines).

In [0]:
!pip install imbalanced-learn
!pip install deepchecks --upgrade

## Loading the train data uploaded to Databricks previously

In [0]:
def load_datasets(files_location, file_type):
    for file_location in files_location:
        # CSV options
        infer_schema = "false"
        first_row_is_header = "true"
        delimiter = ","

        # The applied options are for CSV files. For other file types, these will be ignored.
        df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_location)
        yield df

In [0]:
# File location and type
files_location = ["/FileStore/tables/train.csv", "/FileStore/tables/test.csv"]
file_type = "csv"

train, test = load_datasets(files_location, file_type)

In [0]:
train.limit(5).toPandas()

In [0]:
test.limit(5).toPandas()

### Standardization of columns name

In [0]:
from pyspark.sql.functions import regexp_replace, col

def rename_lower_columns(dfs):
    for df in dfs:
        # Rename columns with spaces to columns with underscores
        df = df.withColumnRenamed("Product Id", "product_id")\
                .withColumnRenamed("Air temperature [K]", "air_temperature")\
                .withColumnRenamed("Process temperature [K]", "process_temperature")\
                .withColumnRenamed("Rotational speed [rpm]", "rotational_speed")\
                .withColumnRenamed("Torque [Nm]", "torque")\
                .withColumnRenamed("Tool wear [min]", "tool_wear")\
                .withColumnRenamed("machine failure", "machine_failure")

        # Lower all columns name
        df = df.select([c.lower() for c in df.columns])

        # Remove unwanted characters from columns name
        df = df.select([regexp_replace(col(c), "[ ,;{}()\n\t=]", "").alias(c) for c in df.columns])
        yield df

In [0]:
train, test = rename_lower_columns([train, test])

In [0]:
train.limit(5).toPandas()

### Saving data

In [0]:
# Save table

spark.sql("DROP TABLE IF EXISTS train_csv")
spark.sql("DROP TABLE IF EXISTS test_csv")

train.write.mode("overwrite").option("header", "true").option("overwriteSchema", "True").format("csv")\
    .option("path", "/tables/train")\
    .saveAsTable("train")

test.write.mode("overwrite").option("header", "true").option("overwriteSchema", "True").format("csv")\
    .option("path", "/tables/test")\
    .saveAsTable("test")

## Exploratory Data Analysis

### Loading the table using Apache Spark SQL

In [0]:
%sql

CREATE OR REPLACE TABLE train_table
USING DELTA LOCATION "/tables/train_data"
AS (
  SELECT
    first(type) AS type,
    first(machine_failure) AS machine_failure,
    MAX(CAST(air_temperature AS INT)) AS max_air_temperature,
    MIN(CAST(air_temperature AS INT)) AS min_air_temperature,
    AVG(CAST(air_temperature AS INT)) AS avg_air_temperature,
    MAX(CAST(process_temperature AS INT)) AS max_process_temperature,
    MIN(CAST(process_temperature AS INT)) AS min_process_temperature,
    AVG(CAST(process_temperature AS INT)) AS avg_process_temperature,
    MAX(CAST(rotational_speed AS INT)) AS max_rotational_speed,
    MIN(CAST(rotational_speed AS INT)) AS min_rotational_speed,
    AVG(CAST(rotational_speed AS INT)) AS avg_rotational_speed,
    MAX(CAST(torque AS INT)) AS max_torque,
    MIN(CAST(torque AS INT)) AS min_torque,
    AVG(CAST(torque AS INT)) AS avg_torque,
    MAX(CAST(tool_wear AS INT)) AS max_tool_wear,
    MIN(CAST(tool_wear AS INT)) AS min_tool_wear,
    AVG(CAST(tool_wear AS INT)) AS avg_tool_wear
  FROM train
  GROUP BY product_id
)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m/usr/lib/python3.10/pathlib.py:625[0m, in [0;36mPurePath.__str__[0;34m(self)[0m
[1;32m    624[0m [38;5;28;01mtry[39;00m:
[0;32m--> 625[0m     [38;5;28;01mreturn[39;00m [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43m_str[49m
[1;32m    626[0m [38;5;28;01mexcept[39;00m [38;5;167;01mAttributeError[39;00m:

[0;31mAttributeError[0m: 'PosixPath' object has no attribute '_str'

During handling of the above exception, another exception occurred:

[0;31mRecursionError[0m                            Traceback (most recent call last)
File [0;32m<command-2800313163698086>:6[0m
[1;32m      4[0m     display(df)
[1;32m      5[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 6[0m   _sqldf [38;5;241m=[39m __databricks_percent_sql()
[1;32m      7[0m [38;5;28;01mfinally[39;00m:
[

**Using the DESCRIBE function to output table details such as data type**

In [0]:
%sql
-- Print details 
DESCRIBE EXTENDED train_table



In [0]:
%sql

-- Select the first 10 rows
SELECT * 
FROM train_table
LIMIT 10



In [0]:
%sql

-- plot total by machine type
SELECT type, count(*) AS total
FROM train_table
GROUP BY type



This plot displays the distribution of machine types based on the total number of machines. Ideally, we aim for a relatively balanced distribution across all machine types. Having a similar number of machines for each type ensures that our model is not biased or overly influenced by the machine type feature.

In [0]:
%sql 

-- plot total by machine status (failure or not)
SELECT type, machine_failure, COUNT(*) AS total
FROM train_table
GROUP BY type, machine_failure



This plot reveals an important insight: the failure percentage does not exceed 2% for any machine type. This information indicates that our dataset is unbalanced, as there is a significant difference in the failure occurrences across machine types. From a modeling perspective, it is crucial to address this class imbalance issue to ensure that it does not adversely impact the model's performance.

## Modeling

**Benchmark Model**

In [0]:
# read table 
df = spark.table("train_table").toPandas()
df.head(5)



In [0]:
from sklearn.metrics import accuracy_score, roc_auc_score, ConfusionMatrixDisplay, RocCurveDisplay, roc_curve, auc
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelBinarizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.compose import ColumnTransformer

y_scores = []

X = df.drop("machine_failure", axis=1)
y = df["machine_failure"]

# Split data into train and validation
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.15, random_state=42)

# Preprocess categorical and numerical features separately
categorical_features = ['type']
numerical_features = df.drop(["type", "machine_failure"], axis=1).columns

# Create transformers for each feature type
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(), categorical_features),
        ('num', StandardScaler(), numerical_features)
    ])

# Create a pipeline
benchmark = make_pipeline(preprocessor, LogisticRegression())

# Fit the pipeline to the train set
benchmark.fit(X_train, y_train)

# Compute probabilities
y_score = benchmark.predict_proba(X_val)
y_scores.append(y_score)

# Compute the accuracy score in the validation set
print(benchmark.score(X, y))




The accuracy value achieved is remarkably high, with nearly 100% correct predictions. However, it is crucial to address a significant issue that has been mentioned previously. The dataset suffers from a severe class imbalance, with 98% of cases representing non-failure instances. Consequently, our model might struggle to identify failure cases effectively, as there is a 98% chance that simply predicting that a machine is not failing would yield correct results by chance. Now let's look to the AUC ROC curve and the confusion matrix as these metrics can shows a more accurate result.

To gain a more comprehensive understanding of our model's performance, it is recommended to use metrics such as the AUC ROC curve and the confusion matrix. These metrics provide a more accurate assessment of the model's predictive ability, particularly in scenarios with imbalanced datasets. By examining these metrics, we can better evaluate how well our model distinguishes between failure and non-failure cases.

In [0]:
RocCurveDisplay.from_estimator(benchmark, X, y)



Oops! It appears that the AUC ROC curve reveals a different story compared to the initially high accuracy. The plot indicates that our model's performance is not as impressive as the accuracy metric initially suggested. In fact, the AUC ROC curve indicates that our model was able to correctly classify only 77% of the cases. Now let's use the confusion matrix to further investigate what is happening with our model's performance.

In [0]:
ConfusionMatrixDisplay.from_estimator(benchmark, X, y)



Finally, we have uncovered the truth about our model's performance. Upon closer examination, it becomes evident that our model excelled in classifying non-failure machines. However, when it comes to identifying failure cases, our model fared poorly, failing to correctly classify even a single instance of failure machines.

It is imperative to address the significant imbalance between failure and non-failure cases in our dataset, as any misclassification could have severe consequences. We must now focus on improving our model's ability to accurately identify and classify failure instances, as it is a critical aspect of our application.

## Bootstrap sample the minority class

Randomly duplicate examples in the minority class by sampling with replacement

In [0]:
from imblearn.over_sampling import RandomOverSampler

oversample = RandomOverSampler(sampling_strategy='minority')

X_over, y_over = oversample.fit_resample(X, y)

print(f"Result: {y_over.value_counts()/len(y_over)}")



In [0]:
# Split data into train and validation
X_train, X_val, y_train, y_val = train_test_split(X_over, y_over, test_size=0.10, random_state=42)

# Preprocess categorical and numerical features separately
categorical_features = ['type']
numerical_features = df.drop(["type", "machine_failure"], axis=1).columns

# Create transformers for each feature type
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(), categorical_features),
        ('num', StandardScaler(), numerical_features)
    ])

# Create a pipeline
pipeline_over = make_pipeline(preprocessor, LogisticRegression())

# Fit the pipeline to the train set
pipeline_over.fit(X_train, y_train)

# Compute probabilities
y_score = pipeline_over.predict_proba(X_val)
y_scores.append(y_score)

# Compute the accuracy score in the validation set
print(pipeline_over.score(X, y))



## Undersample the majority class

Randomly delete examples from the majority class

In [0]:
from imblearn.under_sampling import RandomUnderSampler

undersample = RandomUnderSampler(sampling_strategy='majority')

X_under, y_under = undersample.fit_resample(X, y)

print(f"Result: {y_under.value_counts()/len(y_under)}")



In [0]:
# Split data into train and validation
X_train, X_val, y_train, y_val = train_test_split(X_under, y_under, test_size=0.10, random_state=42)

# Preprocess categorical and numerical features separately
categorical_features = ['type']
numerical_features = df.drop(["type", "machine_failure"], axis=1).columns

# Create transformers for each feature type
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(), categorical_features),
        ('num', StandardScaler(), numerical_features)
    ])

# Create a pipeline
pipeline_under = make_pipeline(preprocessor, LogisticRegression())

# Fit the pipeline to the train set
pipeline_under.fit(X_train, y_train)

# Predict the validation set
y_predicted = pipeline_under.predict(X_val)

# Compute probabilities
y_score = pipeline_under.predict_proba(X_val)
y_scores.append(y_score)

# Compute the accuracy score in the validation set
print(pipeline_under.score(X, y))



By using oversampling and undersampling we had a worst accuracy score we before we need to look at others metrics such as confusion matrix and AUC.

In [0]:

models_name = ["Benchmark", "Oversample", "Undersample"] 
for model, name in zip([benchmark, pipeline_over, pipeline_under], models_name):
    RocCurveDisplay.from_estimator(model, X, y, name=name)



In [0]:
models_name = ["Benchmark", "Oversample", "Undersample"] 
for model, name in zip([benchmark, pipeline_over, pipeline_under], models_name):
    ConfusionMatrixDisplay.from_estimator(model, X, y)
    plt.title(name)
    



## SMOTE