In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    when,
    regexp_replace,
    col,
    lit,
    regexp_extract,
    broadcast,
)

In [2]:
spark = ( SparkSession.builder
   .appName("My App") 
   .master("local")
   .getOrCreate()
)

container_path = os.getcwd()

- equipment_sensors.csv

In [3]:
es = spark.read.csv(
    f"{container_path}/data/equipment_sensors.csv", 
    header=True, 
    inferSchema=True
)

es.printSchema()
es.show()

root
 |-- equipment_id: integer (nullable = true)
 |-- sensor_id: integer (nullable = true)

+------------+---------+
|equipment_id|sensor_id|
+------------+---------+
|           1|     4275|
|           2|     5212|
|           3|     7381|
|           4|      396|
|           5|     1645|
|           6|     9532|
|           7|     3328|
|           8|     8461|
|           9|     2173|
|          10|     3499|
|          11|     8551|
|          12|     3965|
|          13|     8271|
|          14|     2768|
|           1|     8725|
|           2|     3167|
|           3|     6078|
|           4|     7001|
|           5|     5477|
|           6|     2278|
+------------+---------+
only showing top 20 rows



- equipment.json

In [4]:
eq = spark.read.option("multiline", "true").json(
    f"{container_path}/data/equipment.json"
)
eq = eq.withColumn("equipment_id", col("equipment_id").cast("int"))

eq.printSchema()
eq.show()


root
 |-- equipment_id: integer (nullable = true)
 |-- group_name: string (nullable = true)
 |-- name: string (nullable = true)

+------------+----------+--------+
|equipment_id|group_name|    name|
+------------+----------+--------+
|           1|  FGHQWR2Q|5310B9D7|
|           2|  VAPQY59S|43B81579|
|           3|  FGHQWR2Q|E1AD07D4|
|           4|  9N127Z5P|ADE40E7F|
|           5|  9N127Z5P|78FFAD0C|
|           6|  PA92NCXZ|9AD15F7E|
|           7|  FGHQWR2Q|E54B5C3A|
|           8|  NQWPA8D3|86083278|
|           9|  VAPQY59S|3329175B|
|          10|  NQWPA8D3|98B84035|
|          11|  PA92NCXZ|09C37FB8|
|          12|  FGHQWR2Q|CF304D24|
|          13|  Z9K1SAP4|4E834E81|
|          14|  VAPQY59S|2C195700|
+------------+----------+--------+



In [5]:
# Fazemos um mapping para o grupo dos equipamentos, para melhor performance

eq = eq.withColumn(
    "group_name_id",
    when(col("group_name") == "FGHQWR2Q", 0)
    .when(col("group_name") == "VAPQY59S", 1)
    .when(col("group_name") == "9N127Z5P", 2)
    .when(col("group_name") == "PA92NCXZ", 3)
    .when(col("group_name") == "NQWPA8D3", 4)
    .when(col("group_name") == "Z9K1SAP4", 5)
    .otherwise(-1)
    .cast("int"),
)

eq.printSchema()
eq.show()

root
 |-- equipment_id: integer (nullable = true)
 |-- group_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- group_name_id: integer (nullable = false)

+------------+----------+--------+-------------+
|equipment_id|group_name|    name|group_name_id|
+------------+----------+--------+-------------+
|           1|  FGHQWR2Q|5310B9D7|            0|
|           2|  VAPQY59S|43B81579|            1|
|           3|  FGHQWR2Q|E1AD07D4|            0|
|           4|  9N127Z5P|ADE40E7F|            2|
|           5|  9N127Z5P|78FFAD0C|            2|
|           6|  PA92NCXZ|9AD15F7E|            3|
|           7|  FGHQWR2Q|E54B5C3A|            0|
|           8|  NQWPA8D3|86083278|            4|
|           9|  VAPQY59S|3329175B|            1|
|          10|  NQWPA8D3|98B84035|            4|
|          11|  PA92NCXZ|09C37FB8|            3|
|          12|  FGHQWR2Q|CF304D24|            0|
|          13|  Z9K1SAP4|4E834E81|            5|
|          14|  VAPQY59S|2C195700|      

- equpment_failure_sensors.txt

In [6]:
efs = spark.read.text(f"{container_path}/data/equpment_failure_sensors.txt")

efs = efs.select(
    regexp_extract("value", r"\[(.*?)\]", 1).alias("date"),
    regexp_extract("value", r"\]\t(.*?)\t", 1).alias("situation"),
    regexp_extract("value", r"sensor\[(.*?)\]", 1).alias("sensor"),
    regexp_extract("value", r"temperature\t(.*?),", 1).alias("temperature"),
    regexp_extract("value", r"vibration\t(.*?)\)", 1).alias("vibration")
)

In [7]:
efs.printSchema()

root
 |-- date: string (nullable = true)
 |-- situation: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- vibration: string (nullable = true)



In [8]:
efs.show(27)

+-------------------+---------+------+-----------+---------+
|               date|situation|sensor|temperature|vibration|
+-------------------+---------+------+-----------+---------+
| 2021-05-18 0:20:48|    ERROR|  5820|     311.29|  6749.50|
| 2021-05-18 0:20:48|    ERROR|  5820|     311.29|  6749.50|
| 2021-06-14 19:46:9|    ERROR|  3359|     270.00|  -335.39|
|2020-09-27 22:55:11|    ERROR|  9503|     255.84|  1264.54|
|  2019-02-9 20:56:4|    ERROR|  3437|     466.57| -1865.26|
|  2019-02-6 6:19:34|    ERROR|  2958|     143.02| -4993.13|
|2019-08-10 20:23:22|    ERROR|  3743|     249.81|  8925.85|
|2021-03-25 14:39:49|    ERROR|  6282|     475.57| -2859.46|
|2020-05-15 17:30:17|    ERROR|  2477|     200.20| -6866.76|
|2020-12-11 11:52:47|    ERROR|  3838|     330.33| -6170.30|
| 2019-04-16 8:28:34|    ERROR|  4031|     310.94|  7200.50|
|  2020-10-1 20:8:30|    ERROR|  6848|     495.47|  7626.35|
| 2019-03-13 4:13:22|    ERROR|  1633|     124.81| -5077.34|
|2020-01-11 11:43:48|   

In [9]:
efs = (
    efs.withColumn("date", regexp_replace("date", "/", "-"))
    .withColumn(
        "date",
        when(col("date").contains("-"), col("date")).otherwise(
            regexp_replace("date", "(\d{4})-(\d{2})-(\d{2})", "$1-$2-$3 00:00:00")
        ),
    )
    .withColumn("date", col("date").cast("timestamp"))
)

#######################################################################################

error_regex = r"^\s*(ERROR)\s*$"  # Caso ERROR -> 1
warning_regex = r"^\s*(WARNING)\s*$"  # Caso WARNING -> 0


efs = efs.withColumn(
    "situation",
    when(regexp_extract(col("situation"), error_regex, 1) == "ERROR", 1)
    .when(regexp_extract(col("situation"), warning_regex, 1) == "WARNING", 0)
    .otherwise(lit(-1)),
)

efs = efs.withColumn("situation", col("situation").cast("int"))

#######################################################################################

efs = efs.withColumn(
    "sensor",
    when(col("sensor").cast("int").isNotNull(), col("sensor").cast("int")).otherwise(
        lit(-1)
    ),
)

efs = efs.withColumn(
    "temperature",
    when(
        col("temperature").cast("float").isNotNull(), col("temperature").cast("float")
    ).otherwise(lit(-1)),
)

efs = efs.withColumn(
    "vibration",
    when(
        col("vibration").cast("float").isNotNull(), col("vibration").cast("float")
    ).otherwise(lit(None)),
)


In [10]:
efs.show(27)

+-------------------+---------+------+-----------+---------+
|               date|situation|sensor|temperature|vibration|
+-------------------+---------+------+-----------+---------+
|2021-05-18 00:20:48|        1|  5820|     311.29|   6749.5|
|2021-05-18 00:20:48|        1|  5820|     311.29|   6749.5|
|2021-06-14 19:46:09|        1|  3359|      270.0|  -335.39|
|2020-09-27 22:55:11|        1|  9503|     255.84|  1264.54|
|2019-02-09 20:56:04|        1|  3437|     466.57| -1865.26|
|2019-02-06 06:19:34|        1|  2958|     143.02| -4993.13|
|2019-08-10 20:23:22|        1|  3743|     249.81|  8925.85|
|2021-03-25 14:39:49|        1|  6282|     475.57| -2859.46|
|2020-05-15 17:30:17|        1|  2477|      200.2| -6866.76|
|2020-12-11 11:52:47|        1|  3838|     330.33|  -6170.3|
|2019-04-16 08:28:34|        1|  4031|     310.94|   7200.5|
|2020-10-01 20:08:30|        1|  6848|     495.47|  7626.35|
|2019-03-13 04:13:22|        1|  1633|     124.81| -5077.34|
|2020-01-11 11:43:48|   

In [11]:
efs_tmp = efs.join(
    broadcast(es),
    on=[efs["sensor"] == es["sensor_id"]],
    how="left",
)#.drop("sensor")

efs_tmp = efs_tmp.select(
    col("date"),
    col("situation"),
    col("sensor_id"),
    col("temperature"),
    col("vibration"),
    col("equipment_id").alias("equipment"),
)

efs_end = efs_tmp.join(
    broadcast(eq),
    on=[efs_tmp["equipment"] == eq["equipment_id"]],
    how="left",
).drop("equipment", "group_name", "name")

efs_end = efs_end.dropDuplicates()

In [12]:
efs_end.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- situation: integer (nullable = false)
 |-- sensor_id: integer (nullable = true)
 |-- temperature: float (nullable = true)
 |-- vibration: float (nullable = true)
 |-- equipment_id: integer (nullable = true)
 |-- group_name_id: integer (nullable = true)



In [13]:
efs_end.show(27)

+-------------------+---------+---------+-----------+---------+------------+-------------+
|               date|situation|sensor_id|temperature|vibration|equipment_id|group_name_id|
+-------------------+---------+---------+-----------+---------+------------+-------------+
|2019-05-08 16:41:55|        1|     8010|     424.18|  9257.33|           8|            4|
|2021-09-19 11:33:50|        1|     4746|     187.46| -8001.04|          10|            4|
|2020-11-21 20:21:05|        1|      354|     391.65|  -6871.0|          13|            5|
|2021-08-11 03:56:03|        1|     7256|      86.46|  2227.63|          11|            3|
|2019-02-18 15:26:15|        1|     2958|     369.08|   144.19|          14|            1|
|2020-05-09 11:39:44|        1|      404|     349.08|  -7069.6|           7|            0|
|2019-06-06 04:09:41|        1|     4726|     455.08| -7051.81|           4|            2|
|2019-08-03 01:50:55|        1|     3668|     236.94| -8260.44|           9|            1|

In [14]:
# Criando as views para fazer as consultas
efs_end.createOrReplaceTempView("efs_end")
eq.createOrReplaceTempView("eq")

1. Total equipment failures that happened?

In [15]:
total_failures_df = spark.sql(
    """
    SELECT COUNT(*) AS total_failures
    FROM efs_end
    WHERE situation = 1
"""
)

total_failures_df.show() 

# +--------------+
# |total_failures|
# +--------------+
# |       4749474|
# +--------------+

+--------------+
|total_failures|
+--------------+
|       4749474|
+--------------+



2. Which equipment name had most failures?

In [16]:
most_failed_equipment = spark.sql(
    """
    SELECT eq.name AS equipment_name, 
    COUNT(ef.situation) AS total_failures
    FROM efs_end ef 
    LEFT JOIN eq
    ON eq.equipment_id = ef.equipment_id
    WHERE ef.situation = 1
    GROUP BY eq.name
    ORDER BY total_failures DESC
    LIMIT 1
"""
)

most_failed_equipment.show()

# +--------------+--------------+
# |equipment_name|total_failures|
# +--------------+--------------+
# |      2C195700|        343800|
# +--------------+--------------+

+--------------+--------------+
|equipment_name|total_failures|
+--------------+--------------+
|      2C195700|        343800|
+--------------+--------------+



3. Average amount of failures across equipment group, ordered by the number of failures in ascending order?

In [17]:
average_failures_by_group = spark.sql(
    """
    SELECT eq.group_name, 
    AVG(ef.failures_sum) AS average_failures
    FROM  
     (
        SELECT equipment_id, 
        COUNT(situation) AS failures_sum
        FROM efs_end
        WHERE situation = 1
        GROUP BY equipment_id
     ) ef
    LEFT JOIN eq
    ON eq.equipment_id = ef.equipment_id
    GROUP BY eq.group_name
    ORDER BY average_failures ASC
"""
)

average_failures_by_group.show()

# +----------+-----------------+
# |group_name| average_failures|
# +----------+-----------------+
# |  FGHQWR2Q|        336217.25|
# |  PA92NCXZ|         339070.5|
# |  9N127Z5P|         340365.5|
# |  Z9K1SAP4|         340549.0|
# |  NQWPA8D3|         340818.0|
# |  VAPQY59S|341182.6666666667|
# +----------+-----------------+

+----------+-----------------+
|group_name| average_failures|
+----------+-----------------+
|  FGHQWR2Q|        336217.25|
|  PA92NCXZ|         339070.5|
|  9N127Z5P|         340365.5|
|  Z9K1SAP4|         340549.0|
|  NQWPA8D3|         340818.0|
|  VAPQY59S|341182.6666666667|
+----------+-----------------+



4.  Rank the sensors which present the most number of errors by equipment name in an equipment group.

In [18]:
ranked_sensors_by_errors = spark.sql(
    """
    SELECT eq.group_name, 
    eq.name AS equipment_name, 
    ef.sensor_id, 
    COUNT(ef.situation) AS error_count,
    ROW_NUMBER() OVER(PARTITION BY eq.group_name, eq.name ORDER BY COUNT(ef.situation) DESC) AS rank
    FROM efs_end ef 
    LEFT JOIN eq  
    ON eq.equipment_id = ef.equipment_id
    WHERE ef.situation = 1
    GROUP BY eq.group_name, eq.name, ef.sensor_id
"""
)

ranked_sensors_by_errors.show()

# +----------+--------------+---------+-----------+----+
# |group_name|equipment_name|sensor_id|error_count|rank|
# +----------+--------------+---------+-----------+----+
# |  9N127Z5P|      78FFAD0C|      582|        570|   1|
# |  9N127Z5P|      78FFAD0C|     6261|        561|   2|
# |  9N127Z5P|      78FFAD0C|     1361|        560|   3|
# |  9N127Z5P|      78FFAD0C|     2871|        557|   4|
# |  9N127Z5P|      78FFAD0C|     6293|        552|   5|
# |  9N127Z5P|      78FFAD0C|     6344|        552|   6|
# |  9N127Z5P|      78FFAD0C|     9917|        552|   7|
# |  9N127Z5P|      78FFAD0C|     9576|        551|   8|
# |  9N127Z5P|      78FFAD0C|     2635|        551|   9|
# |  9N127Z5P|      78FFAD0C|     6231|        551|  10|
# |  9N127Z5P|      78FFAD0C|     6848|        549|  11|
# |  9N127Z5P|      78FFAD0C|     7983|        549|  12|
# |  9N127Z5P|      78FFAD0C|      465|        548|  13|
# |  9N127Z5P|      78FFAD0C|     3824|        548|  14|
# |  9N127Z5P|      78FFAD0C|     7776|        546|  15|
# |  9N127Z5P|      78FFAD0C|     2157|        546|  16|
# |  9N127Z5P|      78FFAD0C|     4502|        545|  17|
# |  9N127Z5P|      78FFAD0C|     5539|        545|  18|
# |  9N127Z5P|      78FFAD0C|     2594|        545|  19|
# |  9N127Z5P|      78FFAD0C|     7688|        545|  20|
# +----------+--------------+---------+-----------+----+
# only showing top 20 rows

+----------+--------------+---------+-----------+----+
|group_name|equipment_name|sensor_id|error_count|rank|
+----------+--------------+---------+-----------+----+
|  9N127Z5P|      78FFAD0C|      582|        570|   1|
|  9N127Z5P|      78FFAD0C|     6261|        561|   2|
|  9N127Z5P|      78FFAD0C|     1361|        560|   3|
|  9N127Z5P|      78FFAD0C|     2871|        557|   4|
|  9N127Z5P|      78FFAD0C|     6293|        552|   5|
|  9N127Z5P|      78FFAD0C|     6344|        552|   6|
|  9N127Z5P|      78FFAD0C|     9917|        552|   7|
|  9N127Z5P|      78FFAD0C|     9576|        551|   8|
|  9N127Z5P|      78FFAD0C|     2635|        551|   9|
|  9N127Z5P|      78FFAD0C|     6231|        551|  10|
|  9N127Z5P|      78FFAD0C|     6848|        549|  11|
|  9N127Z5P|      78FFAD0C|     7983|        549|  12|
|  9N127Z5P|      78FFAD0C|      465|        548|  13|
|  9N127Z5P|      78FFAD0C|     3824|        548|  14|
|  9N127Z5P|      78FFAD0C|     7776|        546|  15|
|  9N127Z5

In [19]:
spark.stop()