## Use Case 3: ATM Operations - Downtime and Maintenance Analysis

**Objective:** Analyze ATM machine downtime patterns to optimize maintenance schedules and improve service reliability.

**Dataset:** `predictive_maintenance.csv`

**Key Features:**
- Machine type and age
- Temperature, rotational speed, torque
- Tool wear
- Failure types (Heat, Power, Overstrain, etc.)
- Target variable: Machine failure (0 = No failure, 1 = Failure)


In [0]:
# Use Case 3: ATM Operations - Data Loading
from pyspark.sql.functions import col, count, when

# Load ATM maintenance dataset
df_atm = spark.read.csv('/Volumes/workspace/default/capstone-project/predictive_maintenance.csv', 
                        header=True, inferSchema=True)

print("Dataset loaded successfully!")
print(f"Total Records: {df_atm.count()}")
print(f"Total Columns: {len(df_atm.columns)}")

# Display schema
print("\n=== Dataset Schema ===")
df_atm.printSchema()

# Display sample data
display(df_atm.limit(10))


Dataset loaded successfully!
Total Records: 10000
Total Columns: 10

=== Dataset Schema ===
root
 |-- UDI: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air temperature [K]: double (nullable = true)
 |-- Process temperature [K]: double (nullable = true)
 |-- Rotational speed [rpm]: integer (nullable = true)
 |-- Torque [Nm]: double (nullable = true)
 |-- Tool wear [min]: integer (nullable = true)
 |-- Target: integer (nullable = true)
 |-- Failure Type: string (nullable = true)



UDI,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Target,Failure Type
1,M14860,M,298.1,308.6,1551,42.8,0,0,No Failure
2,L47181,L,298.2,308.7,1408,46.3,3,0,No Failure
3,L47182,L,298.1,308.5,1498,49.4,5,0,No Failure
4,L47183,L,298.2,308.6,1433,39.5,7,0,No Failure
5,L47184,L,298.2,308.7,1408,40.0,9,0,No Failure
6,M14865,M,298.1,308.6,1425,41.9,11,0,No Failure
7,L47186,L,298.1,308.6,1558,42.4,14,0,No Failure
8,L47187,L,298.1,308.6,1527,40.2,16,0,No Failure
9,M14868,M,298.3,308.7,1667,28.6,18,0,No Failure
10,M14869,M,298.5,309.0,1741,28.0,21,0,No Failure


In [0]:
# Use Case 3: Data Cleaning & Exploratory Analysis
from pyspark.sql.functions import col, count, when, isnan

# Check for missing values (only for numeric columns)
print("=== Missing Values Analysis ===")
numeric_cols = [field.name for field in df_atm.schema.fields 
                if str(field.dataType) in ['IntegerType', 'DoubleType', 'FloatType', 'LongType']]
string_cols = [field.name for field in df_atm.schema.fields 
               if str(field.dataType) == 'StringType']

# Check missing values for numeric columns
if numeric_cols:
    df_atm.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) 
                   for c in numeric_cols]).show()

# Check missing values for string columns
if string_cols:
    df_atm.select([count(when(col(c).isNull(), c)).alias(c) 
                   for c in string_cols]).show()

# Remove duplicates
df_atm = df_atm.dropDuplicates()
print(f"\nRecords after removing duplicates: {df_atm.count()}")

# Analyze failure distribution
print("\n=== Machine Failure Distribution ===")
failure_col = [c for c in df_atm.columns if 'failure' in c.lower() or 'target' in c.lower()]
if failure_col:
    target_col = failure_col[0]
    df_atm.groupBy(target_col).count().show()
else:
    # If no obvious target column, check for binary columns
    print("Available columns:", df_atm.columns)

# Display basic statistics
print("\n=== Dataset Statistics ===")
df_atm.describe().show()

print("\n✅ Data cleaning completed!")


=== Missing Values Analysis ===

Records after removing duplicates: 10000

=== Machine Failure Distribution ===
+------+-----+
|Target|count|
+------+-----+
|     0| 9661|
|     1|  339|
+------+-----+


=== Dataset Statistics ===
+-------+------------------+----------+-----+-------------------+-----------------------+----------------------+-----------------+-----------------+-------------------+--------------------+
|summary|               UDI|Product ID| Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|      Torque [Nm]|  Tool wear [min]|             Target|        Failure Type|
+-------+------------------+----------+-----+-------------------+-----------------------+----------------------+-----------------+-----------------+-------------------+--------------------+
|  count|             10000|     10000|10000|              10000|                  10000|                 10000|            10000|            10000|              10000|               10000|
|   mean|

In [0]:
# Use Case 3: Feature Engineering
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler

# Identify numeric columns (use backticks for column names with special characters)
feature_cols = [
    "`Air temperature [K]`",
    "`Process temperature [K]`",
    "`Rotational speed [rpm]`",
    "`Torque [Nm]`",
    "`Tool wear [min]`"
]

print(f"Feature columns: {feature_cols}")

# Target column
target_col = "Target"
print(f"Target column: {target_col}")

# Handle categorical column (Type column)
indexer = StringIndexer(inputCol="Type", outputCol="Type_indexed")
df_atm = indexer.fit(df_atm).transform(df_atm)
feature_cols.append("Type_indexed")

# Assemble features
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw",
    handleInvalid="skip"
)

df_assembled = assembler.transform(df_atm)

# Standardize features
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

print("\n✅ Feature engineering completed!")
print(f"Total features: {len(feature_cols)}")
display(df_scaled.select("features", "`Air temperature [K]`", "`Torque [Nm]`").limit(5))


Feature columns: ['`Air temperature [K]`', '`Process temperature [K]`', '`Rotational speed [rpm]`', '`Torque [Nm]`', '`Tool wear [min]`']
Target column: Target

✅ Feature engineering completed!
Total features: 6


features,Air temperature [K],Torque [Nm]
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""-0.8523547551938457"",""-1.0821075494924532"",""0.30244679386499684"",""-0.2996218133613191"",""1.0376228963846548"",""-0.7452693087793546""]}",298.3,37.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""2.0472702030867738"",""2.355165739835387"",""-0.3891929155483674"",""0.4627465812492598"",""0.07931926302966297"",""-0.7452693087793546""]}",304.1,44.6
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.64745125770992"",""0.4006377910019333"",""-0.573258322085795"",""0.42262192890133476"",""-0.5019468752348403"",""-0.7452693087793546""]}",301.3,44.2
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.5974577239464497"",""1.074612945772095"",""-0.6457689367823574"",""0.8038061262066238"",""1.4303702871039137"",""0.7443755218809592""]}",301.2,48.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.14751592007533137"",""1.074612945772095"",""1.462616629009995"",""-1.643797667016813"",""-0.17203906703066274"",""2.234020352541273""]}",300.3,23.6


Databricks visualization. Run in Databricks to view.

In [0]:
# Use Case 3: Predictive Maintenance Model
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Target column
target_col = "Target"

# Split data into train and test
train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train_data.count()} records")
print(f"Test set: {test_data.count()} records")

# Train Random Forest model
print("\n🔧 Training Random Forest model...")
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol=target_col,
    predictionCol="prediction",
    numTrees=100,
    maxDepth=10,
    seed=42
)

rf_model = rf.fit(train_data)

# Make predictions
predictions = rf_model.transform(test_data)

# Evaluate model
auc_evaluator = BinaryClassificationEvaluator(labelCol=target_col, metricName="areaUnderROC")
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol=target_col, metricName="accuracy")

auc = auc_evaluator.evaluate(predictions)
accuracy = accuracy_evaluator.evaluate(predictions)

print(f"\n✅ Model Performance:")
print(f"   • AUC-ROC Score: {auc:.4f}")
print(f"   • Accuracy: {accuracy:.4f}")

# Display predictions
print("\n=== Sample Predictions ===")
display(predictions.select(target_col, "prediction", "probability", 
                           "`Air temperature [K]`", "`Torque [Nm]`", "`Tool wear [min]`").limit(20))


Training set: 8079 records
Test set: 1921 records

🔧 Training Random Forest model...

✅ Model Performance:
   • AUC-ROC Score: 0.9784
   • Accuracy: 0.9755

=== Sample Predictions ===


Target,prediction,probability,Air temperature [K],Torque [Nm],Tool wear [min]
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9992290631172329"",""7.709368827670408E-4""]}",298.1,49.4,5
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.999327475576032"",""6.725244239681401E-4""]}",298.1,42.4,14
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9988088599456602"",""0.0011911400543398014""]}",298.3,28.6,18
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9990057403531432"",""9.942596468567712E-4""]}",298.6,30.0,37
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9991265393704929"",""8.734606295071489E-4""]}",298.9,32.5,55
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9987763485440094"",""0.0012236514559906193""]}",299.0,25.7,68
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.998916253978205"",""0.0010837460217949183""]}",299.0,30.1,84
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9991194231035594"",""8.805768964405576E-4""]}",298.8,48.6,98
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9991165034200891"",""8.834965799108332E-4""]}",298.8,49.1,128
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9983455356496245"",""0.0016544643503754806""]}",298.7,25.8,130


In [0]:
# Use Case 3: Maintenance Insights & Recommendations
from pyspark.sql.functions import avg, sum as spark_sum, col, udf
from pyspark.sql.types import DoubleType

# Analyze failure patterns
print("=== Failure Analysis ===")

# Get feature importance
feature_importance = rf_model.featureImportances
importance_list = [(feature_cols[i].replace("`", ""), float(feature_importance[i])) 
                   for i in range(len(feature_cols))]
importance_sorted = sorted(importance_list, key=lambda x: x[1], reverse=True)

print("\n📊 Top Features for Predicting Failures:")
for i, (feature, importance) in enumerate(importance_sorted, 1):
    print(f"{i}. {feature}: {importance:.4f}")

# Extract probability of failure (class 1) using UDF
def get_prob_failure(probability):
    return float(probability[1]) if probability else 0.0

get_prob_udf = udf(get_prob_failure, DoubleType())

predictions_with_prob = predictions.withColumn("prob_failure", get_prob_udf(col("probability")))

# Identify high-risk machines
high_risk_threshold = 0.7
df_risk = predictions_with_prob.withColumn(
    "risk_level",
    when(col("prob_failure") >= high_risk_threshold, "High Risk")
    .when(col("prob_failure") >= 0.4, "Medium Risk")
    .otherwise("Low Risk")
)

print("\n=== Risk Distribution ===")
df_risk.groupBy("risk_level").count().orderBy("risk_level").show()

# Maintenance recommendations
print("\n=== Maintenance Recommendations ===")
high_risk_machines = df_risk.filter(col("risk_level") == "High Risk")
print(f"🔴 High Risk Machines: {high_risk_machines.count()} machines require immediate maintenance")

medium_risk_machines = df_risk.filter(col("risk_level") == "Medium Risk")
print(f"🟡 Medium Risk Machines: {medium_risk_machines.count()} machines need preventive maintenance")

low_risk_machines = df_risk.filter(col("risk_level") == "Low Risk")
print(f"🟢 Low Risk Machines: {low_risk_machines.count()} machines are operating normally")

# Display high-risk machines
print("\n=== High Risk ATMs Requiring Immediate Attention ===")
display(high_risk_machines.select("risk_level", "prediction", "prob_failure", 
                                   "`Air temperature [K]`", "`Torque [Nm]`", 
                                   "`Tool wear [min]`", "Type").limit(15))

print("\n✅ Use Case 3: ATM Maintenance Analysis Complete!")


=== Failure Analysis ===

📊 Top Features for Predicting Failures:
1. Air temperature [K]: 0.2118
2. Tool wear [min]: 0.2027
3. Torque [Nm]: 0.2023
4. Process temperature [K]: 0.1921
5. Rotational speed [rpm]: 0.1385
6. Type_indexed: 0.0525

=== Risk Distribution ===
+-----------+-----+
| risk_level|count|
+-----------+-----+
|  High Risk|   21|
|   Low Risk| 1867|
|Medium Risk|   33|
+-----------+-----+


=== Maintenance Recommendations ===
🔴 High Risk Machines: 21 machines require immediate maintenance
🟡 Medium Risk Machines: 33 machines need preventive maintenance
🟢 Low Risk Machines: 1867 machines are operating normally

=== High Risk ATMs Requiring Immediate Attention ===


risk_level,prediction,prob_failure,Air temperature [K],Torque [Nm],Tool wear [min],Type
High Risk,1.0,0.819520202020202,298.9,65.7,191,L
High Risk,1.0,0.9582692307692308,298.4,60.7,216,L
High Risk,1.0,0.8701123680241327,296.8,62.0,199,L
High Risk,1.0,0.7184999999999999,299.2,60.7,191,L
High Risk,1.0,0.7525641025641024,302.0,53.1,212,L
High Risk,1.0,0.7363364889386321,301.9,62.8,22,L
High Risk,1.0,0.7545771243405862,301.8,50.5,64,L
High Risk,1.0,0.748408114722543,302.0,57.6,197,L
High Risk,1.0,0.7109168719104939,302.2,51.6,12,M
High Risk,1.0,0.831946434876278,302.4,45.5,53,L



✅ Use Case 3: ATM Maintenance Analysis Complete!
