# Aufgabenstellungen "Innovative Cheese Producing Company"

In [1]:
import os

from plotly.subplots import make_subplots
import plotly.graph_objects as go

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, count, date_format, first, format_number, month, row_number, sum
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType, DecimalType

from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly

In [2]:
spark = SparkSession.Builder().getOrCreate()

In [3]:
data_file_path = r".\data"
results_path = r".\results"

## Tatsächliche Taktleistung vs. geplante Taktleistung (10 Takte/Minute)

In [4]:
data_file_name = r"oee_data.csv"
data_file = os.path.join(data_file_path, data_file_name)

input_schema = StructType([
    StructField("machine_identifier", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("expected_cycles_per_minute", DoubleType()),
    StructField("actual_cycles_per_minute", DoubleType())
])
oee_data = spark.read.csv(
    data_file,
    schema=input_schema,
    header=True
)
print(f"Number of rows in {data_file_name}: {oee_data.count()}\n")
oee_data.printSchema()
oee_data.show(100, truncate=False)

Number of rows in oee_data.csv: 120186

root
 |-- machine_identifier: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- expected_cycles_per_minute: double (nullable = true)
 |-- actual_cycles_per_minute: double (nullable = true)

+-------------------------+-------------------+--------------------------+------------------------+
|machine_identifier       |timestamp          |expected_cycles_per_minute|actual_cycles_per_minute|
+-------------------------+-------------------+--------------------------+------------------------+
|innovative_cheese_machine|2022-05-09 06:44:33|null                      |0.368069589138031       |
|innovative_cheese_machine|2022-05-30 20:51:51|null                      |5.289139747619629       |
|innovative_cheese_machine|2022-06-02 12:47:03|null                      |8.800000190734863       |
|innovative_cheese_machine|2022-06-05 22:20:38|null                      |8.800000190734863       |
|innovative_cheese_machine|2022-05-03 12:20:10

In [5]:
# one single machine?
assert oee_data.select(oee_data.machine_identifier).distinct().count() == 1, \
    "More than one single machine in data set!"

# at least one value per row in "expected_cycles_per_minute" or "actual_cycles_per_minute"
rows_both_null = oee_data.filter(oee_data.expected_cycles_per_minute.isNull() & oee_data.actual_cycles_per_minute.isNull())
assert rows_both_null.count() == 0, \
    (f"Error! There are rows in {data_file_name} where both values of cycles/minute are null",
     rows_both_null.show(truncate=False)
    )

# chronologically order by timestamp (from oldest to recent)
oee_data_chrono = oee_data.orderBy("timestamp")
# oee_data_chrono.show(100, truncate=False)

expected_cycles_per_minute = oee_data_chrono.\
    select([oee_data.timestamp, oee_data_chrono.expected_cycles_per_minute]).\
        filter(oee_data_chrono.expected_cycles_per_minute.isNotNull())
actual_cycles_per_minute = oee_data_chrono.\
    select([oee_data.timestamp, oee_data_chrono.actual_cycles_per_minute]).\
        filter(oee_data_chrono.actual_cycles_per_minute.isNotNull())

# some basic statistics
expected_cycles_per_minute.describe("expected_cycles_per_minute").show(truncate=False)
actual_cycles_per_minute.describe("actual_cycles_per_minute").show(truncate=False)

+-------+--------------------------+
|summary|expected_cycles_per_minute|
+-------+--------------------------+
|count  |693                       |
|mean   |8.024531022466794         |
|stddev |1.234863821905644         |
|min    |4.0                       |
|max    |15.0                      |
+-------+--------------------------+

+-------+------------------------+
|summary|actual_cycles_per_minute|
+-------+------------------------+
|count  |119578                  |
|mean   |6.134943375608459       |
|stddev |2.7938130270122654      |
|min    |0.0                     |
|max    |10.0                    |
+-------+------------------------+



In [None]:
pd_expected_cycles_per_minute = expected_cycles_per_minute.withColumn("timestamp", date_format("timestamp", 'yyyy-MM-dd HH:mm:ss.SSS')).toPandas()
pd_actual_cycles_per_minute = actual_cycles_per_minute.withColumn("timestamp", date_format("timestamp", 'yyyy-MM-dd HH:mm:ss.SSS')).toPandas()

TARGET_CYCLES_PER_MINUTE = 10

fig = go.Figure()
fig.add_hline(
    y=TARGET_CYCLES_PER_MINUTE,
    name="Angestrebte Takte / Minute",
    line_color="green", line_dash="dash",
    annotation_text="Angestrebte Takte / Minute", 
    annotation_position="top right"
)
fig.add_trace(
    go.Scatter(
        x=pd_actual_cycles_per_minute.timestamp,
        y=pd_actual_cycles_per_minute.actual_cycles_per_minute,
        name="Tatsächliche Takte / Minute",
        line_shape="hv", line_width=1
    )
)
fig.add_trace(
    go.Scatter(
        x=pd_expected_cycles_per_minute.timestamp,
        y=pd_expected_cycles_per_minute.expected_cycles_per_minute,
        name="Eingestellte Takte / Minute",
        line_shape="hv", opacity=0.5
    )
)
fig.update_layout(
    title="Tatsächliche vs. eingestellte vs. angestrebte Taktleistung",
    yaxis_title="Takte / Minute",
    legend=dict(
        orientation="h",
        yanchor="top",
        y=-0.1,
        xanchor="center",
        x=0.5
    )
)
fig.show()
fig.write_html(file=os.path.join(results_path, r"plots\cycles_per_minute.html"))

## Menge an produzierten Packungen

In [7]:
data_file_name = r"package_data.csv"
data_file = os.path.join(data_file_path, data_file_name)

input_schema = StructType([
    StructField("machine_identifier", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("good_packs", IntegerType()),
    StructField("reject_packs", IntegerType())
])
package_data = spark.read.csv(
    data_file,
    schema=input_schema,
    header=True
)
print(f"Number of rows in {data_file_name}: {package_data.count()}\n")
package_data.printSchema()
package_data.show(truncate=False)

Number of rows in package_data.csv: 270155

root
 |-- machine_identifier: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- good_packs: integer (nullable = true)
 |-- reject_packs: integer (nullable = true)

+-------------------------+-----------------------+----------+------------+
|machine_identifier       |timestamp              |good_packs|reject_packs|
+-------------------------+-----------------------+----------+------------+
|innovative_cheese_machine|2022-05-04 15:05:49.295|23        |1           |
|innovative_cheese_machine|2022-05-04 03:15:28.947|24        |0           |
|innovative_cheese_machine|2022-05-04 08:13:54.767|20        |4           |
|innovative_cheese_machine|2022-05-04 05:35:29.769|24        |0           |
|innovative_cheese_machine|2022-05-04 05:53:27.95 |24        |0           |
|innovative_cheese_machine|2022-05-04 02:21:29.676|24        |0           |
|innovative_cheese_machine|2022-05-04 06:11:55.258|24        |0           |
|innovat

In [8]:
# sum of "good_packs" and "reject_packs" is 24?
sum_not_24 = package_data.filter((package_data.good_packs + package_data.reject_packs).alias("sum_packs") != 24)
assert sum_not_24.count() == 0, \
    (f"Error! There are {sum_not_24.count()} rows in {data_file_name} where sum of good and reject packs is not 24",
     sum_not_24.show(1000, truncate=False)
    )

+-------------------------+-----------------------+----------+------------+
|machine_identifier       |timestamp              |good_packs|reject_packs|
+-------------------------+-----------------------+----------+------------+
|innovative_cheese_machine|2022-05-03 10:04:32.68 |0         |0           |
|innovative_cheese_machine|2022-05-03 10:04:46.17 |0         |0           |
|innovative_cheese_machine|2022-05-03 11:25:00.71 |0         |0           |
|innovative_cheese_machine|2022-05-03 11:25:14.595|0         |0           |
|innovative_cheese_machine|2022-05-03 11:25:37.41 |0         |0           |
|innovative_cheese_machine|2022-05-31 16:49:32.279|23        |5           |
|innovative_cheese_machine|2022-06-10 22:03:45.551|0         |8           |
|innovative_cheese_machine|2022-06-20 05:51:18.94 |0         |0           |
|innovative_cheese_machine|2022-05-26 01:00:48.937|0         |0           |
|innovative_cheese_machine|2022-07-07 14:42:30.36 |0         |0           |
|innovative_

AssertionError: ('Error! There are 207 rows in package_data.csv where sum of good and reject packs is not 24', None)

In [9]:
# chronologically order by timestamp (from oldest to recent)
package_data_chrono = package_data.orderBy("timestamp")
# package_data_chrono.show(100, truncate=False)

sum_packs = package_data_chrono.\
    select([
        sum(package_data_chrono.good_packs).alias("sum_good_packs"),
        sum(package_data_chrono.reject_packs).alias("sum_reject_packs")
    ])

In [10]:
# Plot showing total amount of produced packs
fig = go.Figure(data=go.Pie(
    labels=["Summe Gutpackungen", "Summe Schlechtpackungen"],
    values=list(sum_packs.first().asDict().values()),
    textinfo="value",
    hoverinfo='label+percent',
    marker=dict(line=dict(color='#000000', width=2))
))
fig.update_layout(
    title_text=f"Produzierte Packungen vom {package_data_chrono.first().timestamp.strftime('%d.%m.%Y')} bis {package_data.orderBy('timestamp', ascending=False).first().timestamp.strftime('%d.%m.%Y')}",
    separators=". ")
fig.show()
fig.write_html(file=os.path.join(results_path, r"plots\produced_packs.html"))

In [11]:
# Plot showing total amount of produced packs per month
monthly_produced_packages = package_data.\
    groupBy(month(col("timestamp")).alias("month")).\
        agg(
            sum(col("good_packs")).alias("monthly_good_packs"),
            sum(col("reject_packs")).alias("monthly_reject_packs")
        ).\
    orderBy(col("month"))
pd_monthly_produced_packages = monthly_produced_packages.toPandas()

fig = make_subplots(
    rows=1, cols=pd_monthly_produced_packages.shape[0],
    specs=[[{"type": "domain"} for _ in range(pd_monthly_produced_packages.shape[0])]]
)
for c in range(pd_monthly_produced_packages.shape[0]):
    fig.add_trace(
        go.Pie(
            labels=["Summe Gutpackungen", "Summe Schlechtpackungen"],
            values=pd_monthly_produced_packages.iloc[c, 1:],
            title=f"Monat {pd_monthly_produced_packages.iloc[c, 0]}",
            textinfo="value",
            hoverinfo='label+percent',
            marker=dict(line=dict(color='#000000', width=2))
        ),
        row=1, col=c+1
    )
fig.update_layout(
    title_text=f"Produzierte Packungen je Monat",
    separators=". ")
fig.show()
fig.write_html(file=os.path.join(results_path, r"plots\produced_packs_per_month.html"))

## Probleme / Schlechtpackungen bei "vegan parmesan"

In [12]:
data_file_name = r"recipe_data.csv"
data_file = os.path.join(data_file_path, data_file_name)

input_schema = StructType([
    StructField("machine_identifier", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("recipe", StringType())
])
recipe_data = spark.read.csv(
    data_file,
    schema=input_schema,
    header=True
)
print(f"Number of rows in {data_file_name}: {recipe_data.count()}\n")
recipe_data.printSchema()
recipe_data.show(truncate=False)

Number of rows in recipe_data.csv: 90

root
 |-- machine_identifier: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- recipe: string (nullable = true)

+-------------------------+-----------------------+-------------------+
|machine_identifier       |timestamp              |recipe             |
+-------------------------+-----------------------+-------------------+
|innovative_cheese_machine|2022-06-06 22:51:16.477|cheese sticks      |
|innovative_cheese_machine|2022-05-02 14:28:08.513|vegan parmesan     |
|innovative_cheese_machine|2022-05-31 10:04:17.208|vegan cheese sticks|
|innovative_cheese_machine|2022-07-07 13:04:10.479|vegan hero 150g    |
|innovative_cheese_machine|2022-06-23 00:19:12.227|vegan hero 150g    |
|innovative_cheese_machine|2022-07-12 10:18:49.266|vegan hero 150g    |
|innovative_cheese_machine|2022-05-30 14:17:53.156|vegan cheese sticks|
|innovative_cheese_machine|2022-05-27 11:00:49.677|vegan parmesan     |
|innovative_cheese_machine|2022

In [13]:
# chronologically order by timestamp (from oldest to recent)
recipe_data_chrono = recipe_data.orderBy("timestamp")
recipe_data_chrono.show(100, truncate=False)

+-------------------------+-----------------------+-------------------+
|machine_identifier       |timestamp              |recipe             |
+-------------------------+-----------------------+-------------------+
|innovative_cheese_machine|2022-05-02 06:26:37.727|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 06:26:54.623|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 07:18:23.93 |vegan parmesan     |
|innovative_cheese_machine|2022-05-02 08:08:38.26 |vegan parmesan     |
|innovative_cheese_machine|2022-05-02 08:46:10.376|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 09:13:14.861|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 09:22:22.632|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 14:28:08.513|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 14:51:49.959|vegan parmesan     |
|innovative_cheese_machine|2022-05-02 15:22:22.631|vegan parmesan     |
|innovative_cheese_machine|2022-05-03 09:21:44.886|vegan parmesa

In [14]:
# assign recipe to each done machine cycle
package_data_chrono_expanded_by_recipe = package_data.alias("pd").\
    join(recipe_data.alias("rd"), on=col("rd.timestamp") < col("pd.timestamp")).\
        select(
            "pd.*",
            first("rd.recipe").over(
                Window.partitionBy("pd.timestamp").orderBy(col("rd.timestamp").desc())
            ).alias("recipe")
        ).distinct()\
        .orderBy("pd.timestamp")

# package_data_chrono_expanded_by_recipe.show(100, truncate=False)

In [15]:
RECIPE = "vegan parmesan"
# filter "recipe" and aggreate over daily produced packs (good and reject)
daily_produced_packs = package_data_chrono_expanded_by_recipe.\
    filter(col("recipe") == RECIPE).\
    withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd E")).\
    groupBy(col("date")).\
        agg(
            sum("good_packs").alias("daily_good_packs"),
            sum("reject_packs").alias("daily_reject_packs")
        ).\
    withColumn(
        "daily_reject_packs_percentage",
        format_number(col("daily_reject_packs") / (col("daily_reject_packs") + col("daily_good_packs")) * 100, 2)
    ).\
    orderBy("date")
daily_produced_packs.show(100, truncate=False)

+--------------+----------------+------------------+-----------------------------+
|date          |daily_good_packs|daily_reject_packs|daily_reject_packs_percentage|
+--------------+----------------+------------------+-----------------------------+
|2022-05-03 Tue|84402           |2934              |3.36                         |
|2022-05-04 Wed|143584          |7063              |4.69                         |
|2022-05-05 Thu|136486          |5618              |3.95                         |
|2022-05-06 Fri|84222           |5034              |5.64                         |
|2022-05-07 Sat|0               |120               |100.00                       |
|2022-05-08 Sun|5               |91                |94.79                        |
|2022-05-09 Mon|79863           |4089              |4.87                         |
|2022-05-10 Tue|104147          |4909              |4.50                         |
|2022-05-11 Wed|110662          |5234              |4.52                         |
|202

In [16]:
pd_daily_produced_packs = daily_produced_packs.toPandas()
fig = go.Figure(data=[
    go.Bar(name='Täglich produzierte Gutpackungen',
           x=pd_daily_produced_packs.date, 
           y=pd_daily_produced_packs.daily_good_packs),
    go.Bar(name='Täglich produzierte Schlechtpackungen',
           x=pd_daily_produced_packs.date,
           y=pd_daily_produced_packs.daily_reject_packs,
           text=pd_daily_produced_packs.daily_reject_packs_percentage.apply(lambda x: f"{x}%"))
])
# Change the bar mode
fig.update_layout(barmode='stack')
fig.update_layout(
    title=f"{RECIPE}: Täglich produzierte Packungen",
    yaxis_title="Anzahl Packungen",
    legend=dict(
       orientation="h",
       yanchor="bottom",
       y=1,
       xanchor="center",
       x=0.5
    )
)
fig.update_xaxes(tickangle=-90)
# fig.update_yaxes(type="log")
fig.show()
fig.write_html(file=os.path.join(results_path, rf"plots\{RECIPE}_daily_produced_packs.html"))

## Hypothese: Vor allem unterbrochene Lichtschranken führen zu vielen Stillständen

In [17]:
data_file_name = r"error_messages_accumulated.csv"
data_file = os.path.join(data_file_path, data_file_name)

input_schema = StructType([
    StructField("machine_identifier", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("accumulated_dead_time", DecimalType(10, 3)),
    StructField("identifier", StringType()),
    StructField("station", IntegerType()),
    StructField("station_instance", IntegerType()),
    StructField("subsystem_instance", IntegerType()),
    StructField("code", StringType())
])
error_messages_accumulated = spark.read.csv(
    data_file,
    schema=input_schema,
    header=True
)
print(f"Number of rows in {data_file_name}: {error_messages_accumulated.count()}\n")
error_messages_accumulated.printSchema()
error_messages_accumulated.show(truncate=False)

Number of rows in error_messages_accumulated.csv: 1159

root
 |-- machine_identifier: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- accumulated_dead_time: decimal(10,3) (nullable = true)
 |-- identifier: string (nullable = true)
 |-- station: integer (nullable = true)
 |-- station_instance: integer (nullable = true)
 |-- subsystem_instance: integer (nullable = true)
 |-- code: string (nullable = true)

+-------------------------+-------------------+---------------------+--------------------+-------+----------------+------------------+----+
|machine_identifier       |timestamp          |accumulated_dead_time|identifier          |station|station_instance|subsystem_instance|code|
+-------------------------+-------------------+---------------------+--------------------+-------+----------------+------------------+----+
|innovative_cheese_machine|2022-05-04 02:00:00|35.973               |1.0.130.1.1010.0.1.0|1      |0               |1                 |1010|
|innov

In [18]:
# chronologically order by timestamp (from oldest to recent)
error_messages_accumulated_chrono = error_messages_accumulated.orderBy("timestamp")
error_messages_accumulated_chrono.show(100, truncate=False)

+-------------------------+-------------------+---------------------+----------------------+-------+----------------+------------------+----+
|machine_identifier       |timestamp          |accumulated_dead_time|identifier            |station|station_instance|subsystem_instance|code|
+-------------------------+-------------------+---------------------+----------------------+-------+----------------+------------------+----+
|innovative_cheese_machine|2022-05-03 10:00:00|345.806              |20.0.116.1.1181.0.1.0 |20     |0               |1                 |1181|
|innovative_cheese_machine|2022-05-03 11:00:00|30.069               |93.0.0.0.2030.0.84.0  |93     |0               |0                 |2030|
|innovative_cheese_machine|2022-05-03 11:00:00|35.693               |92.0.122.0.1019.0.1.0 |92     |0               |0                 |1019|
|innovative_cheese_machine|2022-05-03 11:00:00|36.628               |93.0.122.0.1019.0.1.0 |93     |0               |0                 |1019|
|innov

In [19]:
data_file_name = r"error_messages_timeline.csv"
data_file = os.path.join(data_file_path, data_file_name)

input_schema = StructType([
    StructField("machine_identifier", StringType()),
    StructField("start_ts", TimestampType()),
    StructField("end_ts", TimestampType()),
    StructField("duration_in_s", DecimalType(10, 3)),
    StructField("identifier", StringType()),
    StructField("station", IntegerType()),
    StructField("station_instance", IntegerType()),
    StructField("subsystem_instance", IntegerType()),
    StructField("code", StringType())
])
error_messages_timeline = spark.read.csv(
    data_file,
    schema=input_schema,
    header=True
)
print(f"Number of rows in {data_file_name}: {error_messages_timeline.count()}\n")
error_messages_timeline.printSchema()
error_messages_timeline.show(truncate=False)

Number of rows in error_messages_timeline.csv: 1593

root
 |-- machine_identifier: string (nullable = true)
 |-- start_ts: timestamp (nullable = true)
 |-- end_ts: timestamp (nullable = true)
 |-- duration_in_s: decimal(10,3) (nullable = true)
 |-- identifier: string (nullable = true)
 |-- station: integer (nullable = true)
 |-- station_instance: integer (nullable = true)
 |-- subsystem_instance: integer (nullable = true)
 |-- code: string (nullable = true)

+-------------------------+-----------------------+-----------------------+-------------+---------------------+-------+----------------+------------------+----+
|machine_identifier       |start_ts               |end_ts                 |duration_in_s|identifier           |station|station_instance|subsystem_instance|code|
+-------------------------+-----------------------+-----------------------+-------------+---------------------+-------+----------------+------------------+----+
|innovative_cheese_machine|2022-05-03 10:04:47.337|202

In [20]:
# chronologically order by timestamp (from oldest to recent)
error_messages_timeline_chrono = error_messages_timeline.orderBy("start_ts")
error_messages_timeline_chrono.show(100, truncate=False)

+-------------------------+-----------------------+-----------------------+-------------+----------------------+-------+----------------+------------------+----+
|machine_identifier       |start_ts               |end_ts                 |duration_in_s|identifier            |station|station_instance|subsystem_instance|code|
+-------------------------+-----------------------+-----------------------+-------------+----------------------+-------+----------------+------------------+----+
|innovative_cheese_machine|2022-05-03 10:04:47.337|2022-05-03 10:06:19.613|92.276       |20.0.116.1.1181.0.1.0 |20     |0               |1                 |1181|
|innovative_cheese_machine|2022-05-03 10:06:19.637|2022-05-03 10:06:32.73 |13.093       |20.0.116.1.1181.0.1.0 |20     |0               |1                 |1181|
|innovative_cheese_machine|2022-05-03 10:06:32.97 |2022-05-03 10:06:44.346|11.376       |20.0.116.1.1181.0.1.0 |20     |0               |1                 |1181|
|innovative_cheese_machine|2

In [21]:
# count shutdowns total and shutdowns caused by light barriers
shutdowns_total = error_messages_timeline.count()
shutdowns_light_barrier = error_messages_timeline.filter(col('code') == '1019').count()
print(
    f"Stillstände insgesamt {shutdowns_total},",
    f"davon durch unterbrochene Lichtschranken verursacht {shutdowns_light_barrier}",
    f"(prozentual: {shutdowns_light_barrier/shutdowns_total*100:.2f}%)"
)

Stillstände insgesamt 1593, davon durch unterbrochene Lichtschranken verursacht 275 (prozentual: 17.26%)


In [22]:
# count shutdowns per error code
shutdowns_per_error = error_messages_timeline.\
    groupBy(col("code")).\
        agg(
            count("code").alias("count_shutdowns")
        ).\
    withColumn(
        "percentage_shutdowns",
        format_number(col("count_shutdowns") / error_messages_timeline.count() * 100, 2)
    ).\
    orderBy(col("code"))
print("Stillstände je FehlerID (code):")
shutdowns_per_error.show(100, truncate=False)
print("FehlerID (code) mit den meisten Stillständen (Top 3):")
shutdowns_per_error.\
    withColumn("row", row_number().over(Window.partitionBy().orderBy(col("count_shutdowns").desc()))).\
    filter(col("row") <= 3).drop(col("row")).\
    show(truncate=False)

Stillstände je FehlerID (code):
+----+---------------+--------------------+
|code|count_shutdowns|percentage_shutdowns|
+----+---------------+--------------------+
|1001|29             |1.82                |
|1005|8              |0.50                |
|1006|1              |0.06                |
|1010|155            |9.73                |
|1013|3              |0.19                |
|1019|275            |17.26               |
|1024|2              |0.13                |
|1036|9              |0.56                |
|1082|7              |0.44                |
|1095|23             |1.44                |
|1130|1              |0.06                |
|1131|1              |0.06                |
|1152|11             |0.69                |
|1168|72             |4.52                |
|1171|19             |1.19                |
|1175|53             |3.33                |
|1176|1              |0.06                |
|1177|2              |0.13                |
|1178|17             |1.07                |


In [23]:
# aggregate dead time per error code
dead_time_per_error = error_messages_accumulated.\
    groupBy(col("code")).\
        agg(
            sum("accumulated_dead_time").alias("total_accumulated_dead_time_in_s")
        ).\
    withColumn(
        "percentage_total_accumulated_dead_time",
        format_number(col("total_accumulated_dead_time_in_s") / sum(col("total_accumulated_dead_time_in_s")).over(Window.partitionBy()) * 100, 2)
    ).\
    orderBy(col("code"))
print("Stillstandszeit insgesamt:")
dead_time_per_error.groupBy().sum('total_accumulated_dead_time_in_s').show(truncate=False)
print("Stillstandszeit je FehlerID (code):")
dead_time_per_error.show(100, truncate=False)
print("FehlerID (code) mit der akkumuliert meisten Stillstandszeit (Top 3):")
dead_time_per_error.\
    withColumn("row", row_number().over(Window.partitionBy().orderBy(col("total_accumulated_dead_time_in_s").desc()))).\
    filter(col("row") <= 3).drop(col("row")).\
    show(truncate=False)

Stillstandszeit insgesamt:
+-------------------------------------+
|sum(total_accumulated_dead_time_in_s)|
+-------------------------------------+
|184757.371                           |
+-------------------------------------+

Stillstandszeit je FehlerID (code):
+----+--------------------------------+--------------------------------------+
|code|total_accumulated_dead_time_in_s|percentage_total_accumulated_dead_time|
+----+--------------------------------+--------------------------------------+
|1001|3015.154                        |1.63                                  |
|1005|2239.476                        |1.21                                  |
|1006|41.648                          |0.02                                  |
|1010|22627.888                       |12.25                                 |
|1013|0.000                           |0.00                                  |
|1019|14754.290                       |7.99                                  |
|1024|60.388             

In [24]:
# plot stats regarding shutdown and dead time per error
pd_stats_per_error = shutdowns_per_error.alias("sd").\
    join(dead_time_per_error.alias("dt"), col("sd.code") == col("dt.code")).\
        select("sd.*", "dt.total_accumulated_dead_time_in_s", "dt.percentage_total_accumulated_dead_time").\
        orderBy(col("sd.code")).toPandas()

fig = make_subplots(rows=2, cols=1)
fig.add_trace(
    go.Bar(name='Anzahl Stillstände je FehlerID',
           x=pd_stats_per_error.code, 
           y=pd_stats_per_error.count_shutdowns,
           text=pd_stats_per_error.percentage_shutdowns.apply(lambda x: f"{x}%"),
           textangle=-90),
    row=1, col=1
)
fig.add_trace(
    go.Bar(name='Gesamte Stillstandszeit je FehlerID',
           x=pd_stats_per_error.code, 
           y=pd_stats_per_error.total_accumulated_dead_time_in_s,
           text=pd_stats_per_error.percentage_total_accumulated_dead_time.apply(lambda x: f"{x}%"),
           textangle=-90),
    row=2, col=1
)
fig.update_layout(
    title_text=f"Stillstände und Stillstandszeiten je FehlerID",
    xaxis2_title="FehlerID (code)",
    yaxis1_title="Anzahl",
    yaxis2_title="Sekunden",
    legend=dict(
       orientation="h",
       yanchor="top",
       y=-0.25,
       xanchor="center",
       x=0.5
    )
)
fig.update_xaxes(tickangle=-90)
fig.show()
fig.write_html(file=os.path.join(results_path, r"plots\stats_per_error.html"))

## Prognose der Packungsproduktion

### Vorabanalyse Tägliche Produktion

In [25]:
# aggreate over daily produced packs (good and reject)
daily_produced_packs_all = package_data_chrono_expanded_by_recipe.\
    withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd E")).\
    groupBy(col("date")).\
        agg(
            sum("good_packs").alias("daily_good_packs"),
            sum("reject_packs").alias("daily_reject_packs")
        ).\
    withColumn(
        "daily_reject_packs_percentage",
        format_number(col("daily_reject_packs") / (col("daily_reject_packs") + col("daily_good_packs")) * 100, 2)
    ).\
    orderBy("date")
daily_produced_packs_all.show(100, truncate=False)

+--------------+----------------+------------------+-----------------------------+
|date          |daily_good_packs|daily_reject_packs|daily_reject_packs_percentage|
+--------------+----------------+------------------+-----------------------------+
|2022-05-03 Tue|84402           |2934              |3.36                         |
|2022-05-04 Wed|143584          |7063              |4.69                         |
|2022-05-05 Thu|136486          |5618              |3.95                         |
|2022-05-06 Fri|84222           |5034              |5.64                         |
|2022-05-07 Sat|0               |120               |100.00                       |
|2022-05-08 Sun|5               |91                |94.79                        |
|2022-05-09 Mon|79863           |4089              |4.87                         |
|2022-05-10 Tue|104147          |4909              |4.50                         |
|2022-05-11 Wed|110662          |5234              |4.52                         |
|202

In [26]:
pd_daily_produced_packs_all = daily_produced_packs_all.toPandas()
fig = go.Figure(data=[
    go.Bar(name='Täglich produzierte Gutpackungen',
           x=pd_daily_produced_packs_all.date, 
           y=pd_daily_produced_packs_all.daily_good_packs),
    go.Bar(name='Täglich produzierte Schlechtpackungen',
           x=pd_daily_produced_packs_all.date,
           y=pd_daily_produced_packs_all.daily_reject_packs,
           text=pd_daily_produced_packs_all.daily_reject_packs_percentage.apply(lambda x: f"{x}%"))
])
# Change the bar mode
fig.update_layout(barmode='stack')
fig.update_layout(
    title=f"Täglich produzierte Packungen",
    yaxis_title="Anzahl Packungen",
    legend=dict(
       orientation="h",
       yanchor="bottom",
       y=1,
       xanchor="center",
       x=0.5
    )
)
fig.update_xaxes(tickangle=-90)
# fig.update_yaxes(type="log")
fig.show()

### Vorabanalyse Stündliche Produktion

In [27]:
# aggreate over hourly produced packs (good and reject)
hourly_produced_packs_all = package_data_chrono_expanded_by_recipe.\
    withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd HH E")).\
    groupBy(col("date")).\
        agg(
            sum("good_packs").alias("hourly_good_packs"),
            sum("reject_packs").alias("hourly_reject_packs")
        ).\
    withColumn(
        "hourly_reject_packs_percentage",
        format_number(col("hourly_reject_packs") / (col("hourly_reject_packs") + col("hourly_good_packs")) * 100, 2)
    ).\
    orderBy("date")
hourly_produced_packs_all.show(100, truncate=False)

+-----------------+-----------------+-------------------+------------------------------+
|date             |hourly_good_packs|hourly_reject_packs|hourly_reject_packs_percentage|
+-----------------+-----------------+-------------------+------------------------------+
|2022-05-03 10 Tue|0                |120                |100.00                        |
|2022-05-03 11 Tue|2171             |397                |15.46                         |
|2022-05-03 12 Tue|4713             |375                |7.37                          |
|2022-05-03 13 Tue|3855             |153                |3.82                          |
|2022-05-03 14 Tue|6498             |54                 |0.82                          |
|2022-05-03 15 Tue|8980             |140                |1.54                          |
|2022-05-03 16 Tue|9348             |108                |1.14                          |
|2022-05-03 17 Tue|5765             |307                |5.06                          |
|2022-05-03 18 Tue|32

In [28]:
pd_hourly_produced_packs_all = hourly_produced_packs_all.toPandas()
fig = go.Figure(data=[
    go.Bar(name='Stündlich produzierte Gutpackungen',
           x=pd_hourly_produced_packs_all.date, 
           y=pd_hourly_produced_packs_all.hourly_good_packs),
    go.Bar(name='Stündlich produzierte Schlechtpackungen',
           x=pd_hourly_produced_packs_all.date,
           y=pd_hourly_produced_packs_all.hourly_reject_packs,
           text=pd_hourly_produced_packs_all.hourly_reject_packs_percentage.apply(lambda x: f"{x}%"))
])
# Change the bar mode
fig.update_layout(barmode='stack')
fig.update_layout(
    title=f"Stündlich produzierte Packungen",
    yaxis_title="Anzahl Packungen",
    legend=dict(
       orientation="h",
       yanchor="bottom",
       y=1,
       xanchor="center",
       x=0.5
    )
)
fig.update_xaxes(tickangle=-90)
# fig.update_yaxes(type="log")
fig.show()

### Prognose der täglichen Gesamtproduktion an Gutpackungen

In [39]:
pd_daily_produced_good_packs_all = pd_daily_produced_packs_all[["date", "daily_good_packs"]].rename(columns={"date": "ds", "daily_good_packs": "y"})
# boundaries for logistic growth model
pd_daily_produced_good_packs_all["cap"] = 200000.0
pd_daily_produced_good_packs_all["floor"] = 0.0
# instantiate the model and set parameters
model = Prophet(
    interval_width=0.95, # uncertainty level
    growth='logistic',
    daily_seasonality="auto",
    weekly_seasonality=True, # above investigations show weekly trends
    seasonality_mode='multiplicative',
    seasonality_prior_scale=5.0,
    mcmc_samples=10000,
)

# fit the model to historical daily produced good packs (all) data
model.fit(pd_daily_produced_good_packs_all)

07:53:16 - cmdstanpy - INFO - CmdStan installation c:\Users\user\Desktop\InnovativeCheeseProducingCompany\.venv\lib\site-packages\prophet\stan_model\cmdstan-2.26.1 missing makefile, cannot get version.
07:53:16 - cmdstanpy - INFO - Cannot determine whether version is before 2.28.
07:53:16 - cmdstanpy - INFO - CmdStan start processing


chain 1 |          | 00:00 Status

chain 2 |          | 00:00 Status

chain 3 |          | 00:00 Status

chain 4 |          | 00:00 Status

                                                                                                                                                                                                                                                                                                                                

07:53:50 - cmdstanpy - INFO - CmdStan done processing.
Exception: normal_id_glm_lpdf: Scale vector is 0, but must be positive finite! (in 'D:/a/prophet/prophet/python/stan/prophet.stan', line 137, column 2 to line 142, column 4)
Exception: normal_id_glm_lpdf: Scale vector is 0, but must be positive finite! (in 'D:/a/prophet/prophet/python/stan/prophet.stan', line 137, column 2 to line 142, column 4)
Exception: normal_id_glm_lpdf: Scale vector is inf, but must be positive finite! (in 'D:/a/prophet/prophet/python/stan/prophet.stan', line 137, column 2 to line 142, column 4)
Exception: normal_id_glm_lpdf: Scale vector is 0, but must be positive finite! (in 'D:/a/prophet/prophet/python/stan/prophet.stan', line 137, column 2 to line 142, column 4)
Consider re-running with show_console=True if the above output is unclear!





	Chain 2 had 1 divergent transitions (0.0%)
	Chain 3 had 90 divergent transitions (1.8%)
	Chain 4 had 5 divergent transitions (0.1%)
	Use function "diagnose()" to see further information.


<prophet.forecaster.Prophet at 0x17a4e289130>

In [40]:
# look 60 days ahead
pd_future = model.make_future_dataframe(
    periods=60,
    freq='D',
    include_history=True
)
# boundaries for logistic growth model
pd_future["cap"] = 200000.0
pd_future["floor"] = 0.0

# predict over the dataset
pd_historical_and_forecast = model.predict(pd_future)

In [41]:
fig_predict = plot_plotly(model, pd_historical_and_forecast, xlabel='Datum', ylabel='Anzahl Gutpackungen')
fig_predict.show()

In [42]:
fig_components = plot_components_plotly(model, pd_historical_and_forecast, figsize = (900, 400))
fig_components.show()

In [43]:
end_historical = pd_daily_produced_good_packs_all.ds.max()
fig = go.Figure(data=[
    go.Bar(name='Täglich produzierte Gutpackungen historisch',
           x=pd_daily_produced_good_packs_all.ds, 
           y=pd_daily_produced_good_packs_all.y),
    go.Bar(name='Täglich produzierte Gutpackungen prognostiziert',
           x=pd_historical_and_forecast[pd_historical_and_forecast.ds > end_historical].ds.dt.strftime("%Y-%m-%d %a"), 
           y=pd_historical_and_forecast[pd_historical_and_forecast.ds > end_historical].yhat) # yhat are the forecasts
])
# Change the bar mode
fig.update_layout(barmode='stack')
fig.update_layout(
    title=f"Täglich produzierte Gutpackungen: historisch und prognostiziert",
    yaxis_title="Anzahl Packungen",
    legend=dict(
       orientation="h",
       traceorder="normal",
       yanchor="bottom",
       y=1,
       xanchor="center",
       x=0.5
    )
)
fig.update_xaxes(tickangle=-90)
# fig.update_yaxes(type="log")
fig.show()
fig.write_html(file=os.path.join(results_path, r"plots\history_and_forecast_good_packs.html"))