In [12]:
import pyspark
from pyspark.sql.functions import when, col
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [13]:
spark=SparkSession.builder.appName('Practise').getOrCreate()


In [14]:
df = spark.read.csv(r"C:\Users\Son Phan\Downloads\alerts_output\alerts.csv",header=True, inferSchema = True)
df.show(5, truncate=False)

+------------+-----------------------+-----------+----------+---------+----------------+------------+-----------+---------+---------+-------------+----------+-------------+-----------------------+---------------+------------+--------------------+-----------------------+---------------------+-----------------------+---------+-----------+----------------+-----------------------------+-------------+--------------+---------+--------------+------------------+-----------------------+---------------+------------------+-----------------+-----------+------------+--------------------+-------------------+----------------+-----------------------+----------------------+------------------+-------------------+------------------+-------------------+------------------+---------------------------+--------------------------+-------------+-------------------+------------------------+--------------------------+--------------------+----------------------------+----------------------+-----------------+------

# Condition

In [15]:
# 1 Alert Condition Score 
df = df.withColumn("alert_score",
    when(col("alert") == "Normal", 0)
    .when(col("alert") == "Warning", 30)
    .when(col("alert") == "Danger", 50)
)

# 1. Age Condition Score
df = df.withColumn(
    "age_condition_score",
    when(col("equipment_age_days") / (col("expected_lifetime_years") * 365) >= 0.65, 10).otherwise(0)
)

# 2. Downtime Condition Score
df = df.withColumn(
    "downtime_condition_score",
    when((col("downtime_hours") / (col("operating_hours") + col("downtime_hours"))) > 0.05, 10).otherwise(0)
)

# 3. Maintenance Condition Score (increased threshold to 60%)
df = df.withColumn(
    "maintenance_condition_score",
    when((col("days_since_maintenance") / (col("warranty_period_years") * 365)) > 0.5, 10).otherwise(0)
)

# 4. Environment Condition Score (increased thresholds for extreme conditions)
df = df.withColumn(
    "environment_condition_score",
    when((col("ambient_temperature") > 40) | (col("ambient_humidity") > 60), 10).otherwise(0)
)


In [16]:
# 5. Điều kiện chi phí bảo trì

initial_columns = df.columns

if "criticality_avg_annual_cost" in df.columns:
    filtered_output2 = df.drop("criticality_avg_annual_cost")

# Tính chi phí bảo trì trung bình hàng năm theo mức độ quan trọng của thiết bị
maintenance_avg_by_criticality = df.groupBy("criticality").agg(
    (F.avg(F.col("cumulative_maintenance_cost") / F.col("warranty_period_years"))).alias("criticality_avg_annual_cost")
)

df = df.join(
    maintenance_avg_by_criticality,
    on="criticality",
    how="left" 
)

# Tính 150% của chi phí trung bình nhóm và thêm cột này vào dataset
df = df.withColumn("threshold", 1.5 * F.col("criticality_avg_annual_cost"))

df = df.withColumn(
    "maintenance_cost_condition_score",
    F.when(F.col("cumulative_maintenance_cost") / F.col("warranty_period_years") > F.col("threshold"), 10).otherwise(0)
)

final_columns = initial_columns + ["criticality_avg_annual_cost", "threshold", "maintenance_cost_condition_score"]
df = df.select(final_columns)

# Calculate the total operational score by summing up all condition scores
df = df.withColumn(
    "operational_score",
    col("age_condition_score") + col("downtime_condition_score") + col("maintenance_condition_score") + col("environment_condition_score") + col("maintenance_cost_condition_score")
)

In [17]:
columns = [
    "age_condition_score",
    "downtime_condition_score",
    "maintenance_condition_score",
    "environment_condition_score",
    "maintenance_cost_condition_score",
    "operational_score",
    "alert_score"
]

for col in columns:
    result = (
        df.groupBy(col)
        .agg(
            F.count(col).alias("count"),
            (F.count(col) / df.count() * 100).alias("percentage")
        )
        .orderBy(col)
    )
    print(f"Statistics for column: {col}")
    result.show()

Statistics for column: age_condition_score
+-------------------+-----+-----------------+
|age_condition_score|count|       percentage|
+-------------------+-----+-----------------+
|                  0|60508|84.69527728786989|
|                 10|10934|15.30472271213012|
+-------------------+-----+-----------------+

Statistics for column: downtime_condition_score
+------------------------+-----+------------------+
|downtime_condition_score|count|        percentage|
+------------------------+-----+------------------+
|                       0|17064|23.885109599395314|
|                      10|54378| 76.11489040060468|
+------------------------+-----+------------------+

Statistics for column: maintenance_condition_score
+---------------------------+-----+-----------------+
|maintenance_condition_score|count|       percentage|
+---------------------------+-----+-----------------+
|                          0|66442| 93.0013157526385|
|                         10| 5000|6.998684247361496

In [18]:
from pyspark.sql.functions import when, col
# Calculate the total score by adding alert_score and operational_score
df = df.withColumn("total_score", col("alert_score") + col("operational_score"))

# Define the "broken" status based on total_score
df = df.withColumn(
    "maintenance_needed",
    when(col("total_score") >= 60, "Maintenance required").otherwise("No maintenance required")
)

# Calculate the total count of rows
total_count = df.count()

df_counts = df.groupBy("maintenance_needed").count()

# Calculate the percentage for each category
df_percentage = df_counts.withColumn("percentage", (col("count") / total_count) * 100)

# Show the result
df_percentage.select("maintenance_needed", "count", "percentage").show(truncate=False)

+-----------------------+-----+------------------+
|maintenance_needed     |count|percentage        |
+-----------------------+-----+------------------+
|Maintenance required   |12730|17.818650093782367|
|No maintenance required|58712|82.18134990621763 |
+-----------------------+-----+------------------+



In [19]:
df.printSchema()

root
 |-- equipment_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- temperature: double (nullable = true)
 |-- vibration: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- rotational_speed: double (nullable = true)
 |-- power_output: double (nullable = true)
 |-- noise_level: double (nullable = true)
 |-- voltage: double (nullable = true)
 |-- current: double (nullable = true)
 |-- oil_viscosity: double (nullable = true)
 |-- model: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- max_temperature: double (nullable = true)
 |-- max_pressure: double (nullable = true)
 |-- max_rotational_speed: double (nullable = true)
 |-- expected_lifetime_years: double (nullable = true)
 |-- warranty_period_years: integer (nullable = true)
 |-- last_major_overhaul: timestamp (nullable = true)
 |-- location: string (nullable = true)
 |-- criticality: string (nullable = true)
 |

In [23]:
# Convert Spark DataFrame to Pandas and save
try:
    # Convert to Pandas DataFrame
    pandas_df = df.toPandas()
    
    # Create the directory if it doesn't exist
    import os
    os.makedirs("Maintenance recommendation", exist_ok=True)
    
    # Save using Pandas
    output_path = "Maintenance recommendation/dataset_maintenance_recommendation.csv"
    pandas_df.to_csv(output_path, index=False)
    print(f"File successfully saved to {output_path}")
except Exception as e:
    print("Error saving file:", str(e))

File successfully saved to Maintenance recommendation/dataset_maintenance_recommendation.csv
