## ** Equipment Failure Analysis **
Pyspark Notebook to supply information within:
* Total equipment failures that happened?
* Which equipment name had most failures?
* Average amount of failures across equipment group, ordered by the number of failures in ascending order?
* Rank the sensors which present the most number of errors by equipment name in an equipment group.

#### notes:
| information | data | notes |
|-------------|:-----------:|------------:|
| Dataset used | 'equipment_sensors.csv', 'equipment.parquet' and 'equpment_failure_sensors.txt' | None |
| Data paths | update path variables in order to work correctly. | None |
| 



#### Environment Setup
<a id="environment_setup"></a>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract
import os
from datetime import datetime
import logging


spark = SparkSession.builder \
    .appName("Equipment Failure Analysis") \
    .config("spark.some.config.option", "some-value") \
    .enableHiveSupport() \
    .getOrCreate()

# Configuração do Logger para registrar informações
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Equipment Failure Analysis")

# Formatação do nome do arquivo com timestamp
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_dir = "dbfs/FileStore/shared_uploads/mbarbugli@gmail.com"
log_filename = f"{log_dir}/total_falhas_{timestamp}.txt"

logger = logging.getLogger(f"{timestamp} [INFO] Equipment Failure Analysis - Inacializando...")
logger.info(f"{timestamp} [INFO] Sessão Spark iniciada.")

# Ler os dados
df_sensors = spark.read.csv("dbfs:/FileStore/equipment_sensors.csv", header=True, inferSchema=True)
df_equipment = spark.read.parquet("dbfs:/FileStore/equipment.parquet")
df_logs = spark.read.option("delimiter", "\t").csv("dbfs:/FileStore/equpment_failure_sensors.txt", header=False, inferSchema=True)

logger.info(f"{timestamp} [INFO] Os dados foram carregados com sucesso!")
logger.info(f"{timestamp} [INFO] Atualizando Schema: {df_logs}")

# Schema
df_logs = df_logs.toDF("date", "status", "raw_sensor_data", "temperature", "vibration", "signal")
df_logs = df_logs.withColumn(
    "sensor_id", 
    regexp_extract("raw_sensor_data", r"sensor\[(\d+)\]:", 1)
)
# df_logs = df_logs.join(df_sensors, "sensor_id").join(df_equipment, "equipment_id")
logger.info(f"{timestamp} [UPDATE] O Schema: {df_logs}, foi atualizado com sucesso!")

# logger.info(f"{timestamp} [INFO] Criando tabelas permanentes no metastore...")
# # Criar tabelas permanentes no metastore
# df_sensors.write.saveAsTable("sensors")
# df_equipment.write.saveAsTable("equipment")
# df_logs.write.format("csv").option("header", "false").saveAsTable("logs")
# logger.info(f"{timestamp} [INFO] Tabelas criadas com sucesso!")

log_file_path = f"dbfs:/FileStore/log_{timestamp}_equipment_failure.txt"  # TODO! Atualizar caminho.
# Escrevendo no arquivo de log
try:
    with open(log_file_path, 'w') as file:
        logger.info(f"{timestamp} [INFO] Dados dos logs salvos em {log_file_path}") 
except Exception as e:
    logger.error(f"{timestamp} [ERROR] {e}. LOG FILE WAS NOT CREATED! Please check!")

# Criando Alias
df_logs_alias = df_logs.alias("logs")
df_sensors_alias = df_sensors.alias("sensors")
df_equipment_alias = df_equipment.alias("equipment")

logger.info(f"{timestamp} [INFO] Environment Setup finalizado com sucesso!")

# display - TODO:
display(df_sensors)
display(df_equipment)
display(df_logs)

2024-04-22 23:57:55,000 - INFO - 2024-04-22_23-57-54 [INFO] Sessão Spark iniciada.
2024-04-22 23:58:09,313 - INFO - 2024-04-22_23-57-54 [INFO] Os dados foram carregados com sucesso!
2024-04-22 23:58:09,316 - INFO - 2024-04-22_23-57-54 [INFO] Atualizando Schema: DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]
2024-04-22 23:58:09,357 - INFO - 2024-04-22_23-57-54 [UPDATE] O Schema: DataFrame[date: string, status: string, raw_sensor_data: string, temperature: string, vibration: string, signal: string, sensor_id: string], foi atualizado com sucesso!
2024-04-22 23:58:09,359 - ERROR - 2024-04-22_23-57-54 [ERROR] [Errno 2] No such file or directory: 'dbfs:/FileStore/log_2024-04-22_23-57-54_equipment_failure.txt'. LOG FILE WAS NOT CREATED! Please check!
2024-04-22 23:58:09,366 - INFO - 2024-04-22_23-57-54 [INFO] Environment Setup finalizado com sucesso!


equipment_id,sensor_id
1,4275
2,5212
3,7381
4,396
5,1645
6,9532
7,3328
8,8461
9,2173
10,3499


equipment_id,name,group_name
1,5310B9D7,FGHQWR2Q
2,43B81579,VAPQY59S
3,E1AD07D4,FGHQWR2Q
4,ADE40E7F,9N127Z5P
5,78FFAD0C,9N127Z5P
6,9AD15F7E,PA92NCXZ
7,E54B5C3A,FGHQWR2Q
8,86083278,NQWPA8D3
9,3329175B,VAPQY59S
10,98B84035,NQWPA8D3


date,status,raw_sensor_data,temperature,vibration,signal,sensor_id
[2021-05-18 0:20:48],ERROR,sensor[5820]:,(temperature,"311.29, vibration",6749.50),5820
[2021-05-18 0:20:48],ERROR,sensor[5820]:,(temperature,"311.29, vibration",6749.50),5820
[2021-06-14 19:46:9],ERROR,sensor[3359]:,(temperature,"270.00, vibration",-335.39),3359
[2020-09-27 22:55:11],ERROR,sensor[9503]:,(temperature,"255.84, vibration",1264.54),9503
[2019-02-9 20:56:4],ERROR,sensor[3437]:,(temperature,"466.57, vibration",-1865.26),3437
[2019-02-6 6:19:34],ERROR,sensor[2958]:,(temperature,"143.02, vibration",-4993.13),2958
[2019-08-10 20:23:22],ERROR,sensor[3743]:,(temperature,"249.81, vibration",8925.85),3743
[2021-03-25 14:39:49],ERROR,sensor[6282]:,(temperature,"475.57, vibration",-2859.46),6282
[2020-05-15 17:30:17],ERROR,sensor[2477]:,(temperature,"200.20, vibration",-6866.76),2477
[2020-12-11 11:52:47],ERROR,sensor[3838]:,(temperature,"330.33, vibration",-6170.30),3838


### ErrorLogAnalysis
<a id2="ErrorLogAnalysis"></a>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ErrorLogAnalysis").getOrCreate()

# Log inicial
logger.info(f"{timestamp} [INFO] Equipment Failure Analysis - Inciando o 'ErrorLogAnalysis'")
logger.info(f"{timestamp} [INFO] Criando tabelas: 'failures_with_equipment' e 'equipment_failure_counts'...") 

# Join
failures_with_equipment = df_logs_alias \
    .join(df_sensors_alias, df_logs_alias["sensor_id"] == df_sensors_alias["sensor_id"]) \
    .join(df_equipment_alias, df_sensors_alias["equipment_id"] == df_equipment_alias["equipment_id"])

logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!") 
logger.info(f"{timestamp} [INFO] Inciando Transformação de dados: ErrorLogAnalysis ") 

# Filtrar apenas linhas com 'status' igual a 'ERROR'
failures_with_equipment = failures_with_equipment.filter(col("status") == "ERROR")

# 'count' na coluna 'name' from dataframe 'equipment'
equipment_failure_counts = failures_with_equipment \
    .groupBy(df_equipment_alias["name"]) \
    .count() \
    .orderBy(col("count").desc())

# Criando dataframe equipment_failure_counts
equipment_failure_counts = equipment_failure_counts \
    .withColumnRenamed("name", "equipment_name") \
    .withColumnRenamed("count", "equipment_failure_count")
most_failures_equipment = equipment_failure_counts.first()
logger.info(f"{timestamp} [WARNING] Equipamento com maior numero de falhas: {most_failures_equipment} ") 

# Salvando a tabela no databricks.
logger.info(f"{timestamp} [INFO] Salvando tabelas: {equipment_failure_counts}...") 
equipment_failure_counts.write.option("overwriteSchema", "true").format("delta").mode("overwrite").saveAsTable("most_failures_equipment")
logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!") 
display(equipment_failure_counts)
logger.info(f"{timestamp} [INFO] ErrorLogAnalysis - Finalizado com sucesso!") 


2024-04-22 23:58:10,896 - INFO - 2024-04-22_23-57-54 [INFO] Equipment Failure Analysis - Inciando o 'ErrorLogAnalysis'
2024-04-22 23:58:10,898 - INFO - 2024-04-22_23-57-54 [INFO] Criando tabelas: 'failures_with_equipment' e 'equipment_failure_counts'...
2024-04-22 23:58:10,944 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!
2024-04-22 23:58:10,946 - INFO - 2024-04-22_23-57-54 [INFO] Inciando Transformação de dados: ErrorLogAnalysis 
2024-04-22 23:58:31,102 - INFO - 2024-04-22_23-57-54 [INFO] Salvando tabelas: DataFrame[equipment_name: string, equipment_failure_count: bigint]...
2024-04-22 23:58:56,661 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!


equipment_name,equipment_failure_count
2C195700,343800
9AD15F7E,343526
98B84035,342870
78FFAD0C,341033
4E834E81,340549
43B81579,340188
E1AD07D4,340169
ADE40E7F,339698
3329175B,339561
86083278,338766


2024-04-22 23:59:17,080 - INFO - 2024-04-22_23-57-54 [INFO] ErrorLogAnalysis - Finalizado com sucesso!


### Equipment with Most Failures
<a id4="equipment_most_failures"></a>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("EquipmentWithMostFailures").getOrCreate()


logger.info(f"{timestamp} [INFO] Iniciando Transformação de dados: Equipment with Most Failures ") 
equipment_failures = equipment_failure_counts.groupBy("equipment_name").count()
most_failures = equipment_failures.orderBy(col("count").desc()).limit(1)

# Extrair o nome do equipamento com o maior número de falhas (em lista)
most_failures_collect = most_failures.collect() 
if most_failures_collect:
    most_failures_row = most_failures_collect[0] 
    equipment_with_most_failures = most_failures_row["equipment_name"]
    logger.info(f"{timestamp} [INFO] Equipamento com maior número de falha: {equipment_with_most_failures}")
else:
    logger.info(f"{timestamp} [INFO] Nenhum dado encontrado para equipamentos com falhas.")

# Verifica se 'most_failures_collect' é uma lista e transforma em dataframe
if isinstance(most_failures_collect, list):
    most_failures_collect = spark.createDataFrame(most_failures_collect)

# Salvando a tabela no databricks.
logger.info(f"{timestamp} [INFO] Criando tabelas: 'most_failures_collect'...") 
most_failures_collect.write.option("overwriteSchema", "true").format("delta").mode("overwrite").saveAsTable("most_failures_collect")
display(most_failures)
logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!")
logger.info(f"{timestamp} [INFO] Equipment with Most Failures - Finalizado com sucesso!") 



2024-04-22 23:59:17,213 - INFO - 2024-04-22_23-57-54 [INFO] Iniciando Transformação de dados: Equipment with Most Failures 
2024-04-22 23:59:37,559 - INFO - 2024-04-22_23-57-54 [INFO] Equipamento com maior número de falha: 2C195700
2024-04-22 23:59:37,806 - INFO - 2024-04-22_23-57-54 [INFO] Criando tabelas: 'most_failures_collect'...


equipment_name,count
2C195700,1


2024-04-23 00:00:03,805 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!
2024-04-23 00:00:03,806 - INFO - 2024-04-22_23-57-54 [INFO] Equipment with Most Failures - Finalizado com sucesso!


### Average Failures by Equipment Group
<a id5="average_group"></a>

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

logger.info(f"{timestamp} [INFO] Inciando Transformação de dados: Average Failures by Equipment Group") 
logger.info(f"{timestamp} [INFO] Criando DataFrame...")

# Criação de dataframes
failures_with_equipment = df_logs_alias.join(
    df_sensors_alias,
    df_logs_alias["sensor_id"] == df_sensors_alias["sensor_id"],
    "inner"
).join(
    df_equipment_alias,
    df_sensors_alias["equipment_id"] == df_equipment_alias["equipment_id"],
    "inner"
)
logger.info(f"{timestamp} [INFO] DataFrame criado com sucesso!") 

# Calculo da media de falhas por Grupo de Equipamento 
logger.info(f"{timestamp} [INFO] Inciando o calculo da media de falhas por Grupo de Equipamento") 
equipment_failure_counts = failures_with_equipment.groupBy(
    df_equipment_alias["equipment_id"].alias("equipment_id"),
    "group_name"
).agg(
    F.count("status").alias("equipment_failure_count")
)

# Calculo total de falhas entre todos os grupos
windowSpec = Window.partitionBy()  # Sem partition
total_failures = F.sum("equipment_failure_count").over(windowSpec)

# Calculo da porcengatem do total de falhas por grupo e criando coluna
equipment_failure_counts = equipment_failure_counts.withColumn(
    "failure_percent",
    F.col("equipment_failure_count") / total_failures * 100
)

# Calculo média por 'group_name'
avg_failures_per_group = equipment_failure_counts.groupBy("group_name").agg(
    F.avg("equipment_failure_count").alias("avg_failures"),
    F.avg("failure_percent").alias("avg_failure_percent")
)

# Ordenando por "'avg_failures' descending" para encontrar grupo com maior numero de falhas
avg_failures_per_group = avg_failures_per_group.orderBy(F.desc("avg_failures"))

# Salvando a tabela no databricks
logger.info(f"{timestamp} [INFO] Criando tabelas: 'avg_failures_per_group'...") 
avg_failures_per_group.write.option("overwriteSchema", "true").format("delta").mode("overwrite").saveAsTable("most_failureavg_failures_per_groups_collect")
display(avg_failures_per_group)
logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!")
logger.info(f"{timestamp} [INFO] Average Failures by Equipment Group - Finalizado com sucesso!") 

2024-04-23 00:00:03,986 - INFO - 2024-04-22_23-57-54 [INFO] Inciando Transformação de dados: Average Failures by Equipment Group
2024-04-23 00:00:03,991 - INFO - 2024-04-22_23-57-54 [INFO] Criando DataFrame...
2024-04-23 00:00:04,039 - INFO - 2024-04-22_23-57-54 [INFO] DataFrame criado com sucesso!
2024-04-23 00:00:04,041 - INFO - 2024-04-22_23-57-54 [INFO] Inciando o calculo da media de falhas por Grupo de Equipamento
2024-04-23 00:00:04,654 - INFO - 2024-04-22_23-57-54 [INFO] Criando tabelas: 'avg_failures_per_group'...


group_name,avg_failures,avg_failure_percent
NQWPA8D3,357634.5,7.152688569462287
9N127Z5P,357569.5,7.151388569722286
Z9K1SAP4,357528.0,7.150558569888286
VAPQY59S,356937.0,7.138738572252286
PA92NCXZ,356892.5,7.137848572430286
FGHQWR2Q,356867.25,7.137343572531286


2024-04-23 00:00:50,780 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!
2024-04-23 00:00:50,782 - INFO - 2024-04-22_23-57-54 [INFO] Average Failures by Equipment Group - Finalizado com sucesso!


### CSV
<a id6="csv"></a>

In [0]:
# Importar a sessão Spark
from pyspark.sql import SparkSession

logger.info(f"{timestamp} [INFO] Salvando Resultado em CSV: Inicializando...") 
logger.info(f"{timestamp} [INFO] Criando DataFrame...")
df1 = spark.table("most_failures_equipment")
df2 = spark.table("most_failures_collect")
df3 = spark.table("most_failureavg_failures_per_groups_collect")
logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!")

# DBFS (Databricks File System) onde o arquivo CSV será salvo
output_path1 = f"dbfs:/FileStore/most_failures_equipment_{timestamp}.csv"
output_path2 = f"dbfs:/FileStore/most_failures_collect_{timestamp}.csv"
output_path3 = f"dbfs:/FileStore/avg_failures_per_group_{timestamp}.csv"
logger.info(f"{timestamp} [INFO] Salvado CSV: {output_path1}, {output_path2}, {output_path3}")

# Salvar o DataFrame como CSV
df1.write.csv(path=output_path1, mode="overwrite", header=True)
df2.write.csv(path=output_path2, mode="overwrite", header=True)
df3.write.csv(path=output_path3, mode="overwrite", header=True)
logger.info(f"{timestamp} [INFO] Ação realizada com sucesso!")

logger.info(f"{timestamp} [INFO] CSV Saving - Finalizado com sucesso!") 

# Finalizando programa
logger.info(f"{timestamp} [INFO] Finalizando o Equipment Failure - Tech Case") 
logger.info(f"{timestamp} [INFO] Finalizando logs") 


2024-04-23 00:00:50,933 - INFO - 2024-04-22_23-57-54 [INFO] Salvando Resultado em CSV: Inicializando...
2024-04-23 00:00:50,935 - INFO - 2024-04-22_23-57-54 [INFO] Criando DataFrame...
2024-04-23 00:00:51,309 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!
2024-04-23 00:00:51,313 - INFO - 2024-04-22_23-57-54 [INFO] Salvado CSV: dbfs:/FileStore/most_failures_equipment_2024-04-22_23-57-54.csv, dbfs:/FileStore/most_failures_collect_2024-04-22_23-57-54.csv, dbfs:/FileStore/avg_failures_per_group_2024-04-22_23-57-54.csv
2024-04-23 00:00:55,939 - INFO - 2024-04-22_23-57-54 [INFO] Ação realizada com sucesso!
2024-04-23 00:00:55,941 - INFO - 2024-04-22_23-57-54 [INFO] CSV Saving - Finalizado com sucesso!
2024-04-23 00:00:55,942 - INFO - 2024-04-22_23-57-54 [INFO] Finalizando o Equipment Failure - Tech Case
2024-04-23 00:00:55,944 - INFO - 2024-04-22_23-57-54 [INFO] Finalizando logs
