In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/default/vdl/vdl_dummy_with_windows.csv")

motion_sensors = [
    "SkidIndexOut", "LocatorRtn", "LifttoLower", "SkidIndexIn",
    "RaiseoPloc", "LocatorAdv", "LifttoRaise", "SkidInOut", "LiftoRaise"
]
special_rule_sensors = ["LoweroPloc", "LocatorAdv"]
motion_normal = [s for s in motion_sensors if s not in special_rule_sensors]

unmentioned_sensors = [
    "VDLM1VS1.XRMSvelocity", "VDLM1VS1.ZRMSvelocity",
    "VDLM1VS2.XRMSvelocity", "VDLM1VS2.ZRMSvelocity",
    "VDLM2VS1.XRMSvelocity", "VDLM2VS1.ZRMSvelocity",
    "VDLM2VS2.XRMSvelocity", "VDLM2VS2.ZRMSvelocity",
    "OutputcurrentVFD1", "OutputcurrentIDC1", "BusvoltageIDC1", "TorqueFeedbackVFD1",
    "OutputcurrentVFD2", "OutputcurrentIDC2", "BusvoltageIDC2", "TorqueFeedbackVFD2"
]

df = df.withColumn("timestamp_dt", F.to_timestamp("tagtimestamp", "MM/dd/yyyy HH:mm:ss:SSSSSS")) \
       .withColumn("timestamp_secs", F.col("timestamp_dt").cast("double"))

df = df.withColumn("start", F.when(F.col("tagid").isin(motion_normal), F.col("timestamp_secs"))) \
       .withColumn("end", F.when(F.col("tagid").isin(motion_normal), F.col("timestamp_secs") + F.col("tagvalue") / 1000))

w_fwd = Window.orderBy("timestamp_secs").rowsBetween(1, Window.unboundedFollowing)

df = df.withColumn(
    "next_locatorrtn_end",
    F.first(
        F.when(
            F.col("tagid") == "LocatorRtn",
            F.col("timestamp_secs") + F.col("tagvalue") / 1000
        ),
        ignorenulls=True
    ).over(w_fwd)
).withColumn(
    "start",
    F.when(F.col("tagid") == "LoweroPloc", F.col("timestamp_secs")).otherwise(F.col("start"))
).withColumn(
    "end",
    F.when(F.col("tagid") == "LoweroPloc", F.col("next_locatorrtn_end")).otherwise(F.col("end"))
)

df = df.withColumn(
    "next_liftraise_end",
    F.first(
        F.when(
            F.col("tagid") == "LifttoRaise",
            F.col("timestamp_secs") + F.col("tagvalue") / 1000
        ),
        ignorenulls=True
    ).over(w_fwd)
).withColumn(
    "start",
    F.when(F.col("tagid") == "LocatorAdv", F.col("timestamp_secs")).otherwise(F.col("start"))
).withColumn(
    "end",
    F.when(F.col("tagid") == "LocatorAdv", F.col("next_liftraise_end")).otherwise(F.col("end"))
)

w_ffill = Window.orderBy("timestamp_secs").rowsBetween(Window.unboundedPreceding, 0)
df = df.withColumn("ff_start", F.last("start", ignorenulls=True).over(w_ffill)) \
       .withColumn("ff_end", F.last("end", ignorenulls=True).over(w_ffill)) \
       .withColumn("start", F.when(F.col("start").isNull(), F.col("ff_start")).otherwise(F.col("start"))) \
       .withColumn("end", F.when(F.col("end").isNull(), F.col("ff_end")).otherwise(F.col("end"))) \
       .drop("ff_start", "ff_end", "next_locatorrtn_end", "next_liftraise_end")

df = df.withColumn("motion_tag", F.when(F.col("tagid").isin(motion_sensors), F.col("tagid")))
df = df.withColumn("submotion", F.last("motion_tag", ignorenulls=True).over(w_ffill)) \
       .drop("motion_tag")

df = df.withColumn("cycle_start_flag", F.when(F.col("tagid") == "SkidIndexOut", 1).otherwise(0))
df = df.withColumn("motion_cycle_count", F.sum("cycle_start_flag").over(w_ffill)) \
       .drop("cycle_start_flag")

mean_df = (
    df.filter(F.col("tagid").isin(unmentioned_sensors))
      .groupBy("start", "end", "tagid")
      .agg(F.avg("tagvalue").alias("mean"))
)
df = df.join(mean_df, on=["start", "end", "tagid"], how="left")

df = df.withColumn(
    "significant",
    F.when(
        (F.col("tagid").isin(unmentioned_sensors)) &
        (F.col("tagvalue") >= 1.3 * F.col("mean")),
        F.lit(1)
    ).when(
        F.col("tagid").isin(unmentioned_sensors),
        F.lit(0)
    ).otherwise(None)
)

final_df = df.select(
    "tagtimestamp", "tagid", "tagvalue",
    "start", "end",
    "submotion", "motion_cycle_count",
    "mean", "significant"
).orderBy("timestamp_secs")

display(final_df)
