In [0]:
import pyspark.sql.functions as fcn
import pandas
from datetime import datetime, timedelta
from pyspark.sql.window import Window
import os
from marel.services import ServiceProvider
from marel.databricks import DatabricksProvider
provider = DatabricksProvider()

In [0]:
Months = 9
df_sx_telemetry = spark.table('marel_digital_machines.sensorx.marel_sensorx_telemetry')
df_sx_telemetry = df_sx_telemetry.filter(fcn.col('timeStamp') > fcn.add_months(fcn.current_date(),-Months)).select(
    "timeStamp",
    "properties_deviceId",
    "payload_xrayController_filamentCurrent",
    "payload_xrayController_temperature",
    "payload_xrayController_tubeCurrent",
    "payload_xrayController_tubeVoltage",
    "payload_xrayController_onTime"
)

df_sx_telemetry.cache()

In [0]:
Months = 9
df_sx_serialnumber = (
    spark.table("marel_digital_machines.sensorx.marel_sensorx_xraycontroller_property_serialnumber")
    .filter(fcn.col("timeStamp") > fcn.add_months(fcn.current_date(), -Months))
    .groupBy("properties_deviceId", "payload_serialNumber")
    .agg(
        fcn.min("timeStamp").alias("FirstTimestamp"),
        fcn.max("timeStamp").alias("NewestTimestamp")
    )
)


In [0]:
from pyspark.sql import functions as fcn, Window

BIN_SIZE_SECONDS = 86400

T = df_sx_telemetry.alias("T")
S = df_sx_serialnumber.alias("S")

df_sx_telemetry_serialnumber = (
    T.hint("range_join", BIN_SIZE_SECONDS)
     .join(
        S,
        (fcn.col("T.properties_deviceId") == fcn.col("S.properties_deviceId")) &
        (fcn.col("T.timeStamp") >= fcn.col("S.FirstTimestamp")) &
        (fcn.col("T.timeStamp") <= fcn.col("S.NewestTimestamp")),
        how="left"
     )
     .select("T.*", fcn.col("S.payload_serialNumber").alias("serialNumber"))
)

df_sx_telemetry_serialnumber.cache()

Testa seinna:

In [0]:
# Backward fill serialNumber for nulls, partitioned by device and ordered by time descending
window_spec = Window.partitionBy("properties_deviceId").orderBy(fcn.col("timeStamp").desc())
df_sx_telemetry_serialnumber_filled = df_sx_telemetry_serialnumber.withColumn(
    "serialNumber",
    fcn.last("serialNumber", ignorenulls=True).over(window_spec)
)

display(df_sx_telemetry_serialnumber_filled)

In [0]:
from pyspark.sql import functions as F, Window

Months = 9
df_sx_xraycontroller_fault = spark.table('marel_digital_machines.sensorx.marel_sensorx_xraycontroller_property_fault').filter(
    (fcn.col('timeStamp') > fcn.add_months(fcn.current_date(),-Months)) &
    (fcn.col('payload_fault_faultName') == "faultRegulation")
).select("timeStamp","properties_deviceId","payload_fault_faultName","payload_fault_state")

df_sx_xraycontroller_fault.cache()

display(df_sx_xraycontroller_fault)

In [0]:

from pyspark.sql import functions as F, Window

# Telemetry (no casting needed)
telemetry = df_sx_telemetry_serialnumber.alias("t")

# Window per device ordered by timestamp
w = Window.partitionBy("properties_deviceId").orderBy("timeStamp")

# Build fault intervals
fault_int = (
    df_sx_xraycontroller_fault
    .withColumn("LastTimeStamp", F.lead("timeStamp").over(w))
    .alias("f")
)

# Range join using timestamps directly
df_sx_joined_regu_fault = telemetry.join(
    fault_int,
    (F.col("t.properties_deviceId") == F.col("f.properties_deviceId")) &
    (F.col("t.timeStamp") >= F.col("f.timeStamp")) &
    (F.col("f.LastTimeStamp").isNull() | (F.col("t.timeStamp") < F.col("f.LastTimeStamp"))),
    "inner"
)

display(df_sx_joined_regu_fault)


Plotting up known cases of earlier failiures from February and March 2025 and pairing with failiure message

In [0]:


df_sx_telemetry_earlier = (
    spark.table("marel_digital_machines.sensorx.marel_sensorx_telemetry")
    .filter(
        (F.col("timeStamp") >= F.to_timestamp(F.lit("2025-02-01 00:00:00"))) &
        (F.col("timeStamp") <  F.to_timestamp(F.lit("2025-04-01 00:00:00")))
    )
    .select(
        "timeStamp",
        "properties_deviceId",
        "payload_xrayController_filamentCurrent",
        "payload_xrayController_temperature",
        "payload_xrayController_tubeCurrent",
        "payload_xrayController_tubeVoltage",
        "payload_xrayController_onTime"
    )
)

Months = 3
df_sx_xraycontroller_fault_earlier = spark.table('marel_digital_machines.sensorx.marel_sensorx_xraycontroller_property_fault').filter(
        (F.col("timeStamp") >= F.to_timestamp(F.lit("2025-02-01 00:00:00"))) &
        (F.col("timeStamp") <  F.to_timestamp(F.lit("2025-04-01 00:00:00")))
    ).select("timeStamp","properties_deviceId","payload_fault_faultName","payload_fault_state")

feb_faulty_machine = df_sx_telemetry_earlier.filter(F.col("properties_deviceId") == "08947f89-b3c1-4b1e-ed33-08d9f153eeaf")
feb_faulty_machine_fault = df_sx_xraycontroller_fault_earlier.filter(F.col("properties_deviceId") == "08947f89-b3c1-4b1e-ed33-08d9f153eeaf")
display(feb_faulty_machine_fault)

march_faulty_machine = df_sx_telemetry_earlier.filter(F.col("properties_deviceId") == "0b479140-8371-4c78-b490-6fd37df535bc")
march_faulty_machine_fault = df_sx_xraycontroller_fault_earlier.filter(F.col("properties_deviceId") == "0b479140-8371-4c78-b490-6fd37df535bc")


In [0]:
import matplotlib.pyplot as plt
# Teikna upp:

ffm = feb_faulty_machine.toPandas()
ffmf = feb_faulty_machine_fault.toPandas()

plt.figure()
plt.plot(ffm["timeStamp"], ffm["payload_xrayController_tubeCurrent"])
plt.xticks(rotation=45)
plt.show()
