In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
import pandas as pd

from pyspark.sql import functions as F
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import IntegerType, NullType


from pywrangler.pyspark.wranglers import interval_identifier

In [3]:
PATH = r"C:\Users\rasboldt\Desktop\software\pywrangler\notebooks\oscillation_timeseries.csv"

In [4]:
spark = SparkSession.Builder().appName("Oscillation").getOrCreate()

In [5]:
df_all = spark.read.csv(PATH, sep=";", header=True)

In [6]:
df = df_all.select("machinenumber", "timestamp", "state")\
           .withColumn("timestamp", F.col("timestamp").cast(IntegerType()))

In [7]:
df.show(32, False)

+-------------+---------+-----+
|machinenumber|timestamp|state|
+-------------+---------+-----+
|Bla          |1        |ende |
|Bla          |2        |noise|
|Bla          |3        |start|
|Bla          |4        |noise|
|Bla          |5        |noise|
|Bla          |6        |ende |
|Bla          |7        |start|
|Bla          |8        |start|
|Bla          |9        |noise|
|Bla          |10       |noise|
|Bla          |11       |start|
|Bla          |12       |ende |
|Bla          |13       |noise|
|Bla          |14       |noise|
|Bla          |15       |start|
|Bla          |16       |ende |
|Bla          |17       |start|
|Bla          |18       |start|
|Bla          |19       |noise|
|Bla          |20       |noise|
|Bla          |21       |ende |
|Bla          |22       |noise|
|Bla          |23       |noise|
|Bla          |24       |ende |
|Bla          |25       |start|
|Bla          |26       |ende |
|Bla          |27       |noise|
|Bla          |28       |ende |
|Bla    

In [8]:
sequence = interval_identifier.VectorizedCumSum(marker_column="state", 
                                     marker_start="start", 
                                     marker_end="ende", 
                                     order_columns=["timestamp"], 
                                     groupby_columns=["machinenumber"])

In [9]:
df_seq = sequence.transform(df)

In [10]:
df_seq.show(50, False)

+-------------+---------+-----+----+
|machinenumber|timestamp|state|iids|
+-------------+---------+-----+----+
|null         |null     |null |0   |
|null         |null     |null |0   |
|null         |null     |null |0   |
|Bla          |1        |ende |0   |
|Bla          |2        |noise|0   |
|Bla          |3        |start|1   |
|Bla          |4        |noise|1   |
|Bla          |5        |noise|1   |
|Bla          |6        |ende |1   |
|Bla          |7        |start|0   |
|Bla          |8        |start|0   |
|Bla          |9        |noise|0   |
|Bla          |10       |noise|0   |
|Bla          |11       |start|2   |
|Bla          |12       |ende |2   |
|Bla          |13       |noise|0   |
|Bla          |14       |noise|0   |
|Bla          |15       |start|3   |
|Bla          |16       |ende |3   |
|Bla          |17       |start|0   |
|Bla          |18       |start|4   |
|Bla          |19       |noise|4   |
|Bla          |20       |noise|4   |
|Bla          |21       |ende |4   |
|

### Longest sequence (first start, last ende)

In [53]:
# windows
w_lag = Window.partitionBy("machinenumber").orderBy("timestamp")

In [54]:
df = df.withColumn("start_ende", F.when(df.state == "start", 1)\
                                  .when(df.state == "ende", 0).otherwise(None))
# ffill
filled_col = F.last(df.start_ende, ignorenulls=True).over(w_lag.rowsBetween(-sys.maxsize, 0))
df = df.withColumn("start_ende_ffill", filled_col)

In [55]:
df = df.filter(df.machinenumber.isNotNull())

In [58]:
df = df.withColumn("ffill_shift", F.lag("start_ende_ffill", default=0, count=1).over(w_lag).cast(IntegerType()))

In [62]:
df = df.withColumn("start_and_ffill_shift", F.when((F.col("state") == "start") & 
                                                   (F.col("ffill_shift") == 0), 1)\
                                             .otherwise(0))\
       .withColumn("series", F.sum("start_and_ffill_shift").over(w_lag)) 

In [78]:
# b_fill .rowsBetween(0,sys.maxsize) , First
df = df.withColumn("f_start_l_ende", F.when((df.state == "noise") & 
                                            (df.start_and_ffill_shift == 0), 
                                            None)\
                                      .otherwise(df.series))
bfill_col = F.first(F.col("f_start_l_ende"), ignorenulls=True)\
             .over(Window.partitionBy("machinenumber", "series")\
                         .orderBy("timestamp")\
                         .rowsBetween(0, sys.maxsize))
df = df.withColumn("f_start_l_ende", bfill_col)
df = df.withColumn("f_start_l_ende", F.when(F.isnull(df.f_start_l_ende), 0).otherwise(df.series))

In [79]:
df.select("machinenumber", "timestamp", "state", "start_ende", "start_ende_ffill", 
          "ffill_shift", "start_and_ffill_shift", "series", "f_start_l_ende").show(50, False)

+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|machinenumber|timestamp|state|start_ende|start_ende_ffill|ffill_shift|start_and_ffill_shift|series|f_start_l_ende|
+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|Bla          |1        |ende |0         |0               |1          |0                    |0     |0             |
|Bla          |2        |noise|null      |0               |0          |0                    |0     |0             |
|Bla          |3        |start|1         |1               |0          |1                    |1     |1             |
|Bla          |4        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |5        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |6        |ende |0         |0               |1          |0

### First start, First ende

In [83]:
df = df.withColumn("f_start_f_ende", F.when((df.start_ende_ffill == 0) & 
                                            (df.ffill_shift == 0), 
                                            0).otherwise(df.series))

In [84]:
df.select("machinenumber", "timestamp", "state", "start_ende", "start_ende_ffill", 
          "ffill_shift", "start_and_ffill_shift", "series", "f_start_f_ende").show(50, False)

+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|machinenumber|timestamp|state|start_ende|start_ende_ffill|ffill_shift|start_and_ffill_shift|series|f_start_f_ende|
+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|Bla          |1        |ende |0         |0               |1          |0                    |0     |0             |
|Bla          |2        |noise|null      |0               |0          |0                    |0     |0             |
|Bla          |3        |start|1         |1               |0          |1                    |1     |1             |
|Bla          |4        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |5        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |6        |ende |0         |0               |1          |0

### Last start, last ende

In [85]:
# b_fill .rowsBetween(0,sys.maxsize) , First
df = df.withColumn("l_start_l_ende", F.when((df.state == "noise") & 
                                            (df.start_and_ffill_shift == 0), 
                                            None)\
                                      .otherwise(df.series))
bfill_col = F.first(F.col("l_start_l_ende"), ignorenulls=True)\
             .over(Window.partitionBy("machinenumber", "series")\
                         .orderBy("timestamp")\
                         .rowsBetween(0, sys.maxsize))
df = df.withColumn("l_start_l_ende", bfill_col)

In [87]:
df = df.withColumn("l_start_l_ende", F.when(F.isnull(df.l_start_l_ende), 0).otherwise(df.series))

In [93]:
# bfill start 0
max_start = F.max(F.col("timestamp") == "start").over(Window.partitionBy("machinenumber", "series").orderBy("timestamp"))
#df = df.withColumn("max_start", F.max)

In [94]:
df = df.withColumn("max_start", max_start)

In [96]:
df.select("machinenumber", "timestamp", "state", "max_start").show(50, False)

+-------------+---------+-----+---------+
|machinenumber|timestamp|state|max_start|
+-------------+---------+-----+---------+
|Bla          |1        |ende |false    |
|Bla          |2        |noise|false    |
|Bla          |3        |start|true     |
|Bla          |4        |noise|true     |
|Bla          |5        |noise|true     |
|Bla          |6        |ende |true     |
|Bla          |7        |start|true     |
|Bla          |8        |start|true     |
|Bla          |9        |noise|true     |
|Bla          |10       |noise|true     |
|Bla          |11       |start|true     |
|Bla          |12       |ende |true     |
|Bla          |13       |noise|true     |
|Bla          |14       |noise|true     |
|Bla          |15       |start|true     |
|Bla          |16       |ende |true     |
|Bla          |17       |start|true     |
|Bla          |18       |start|true     |
|Bla          |19       |noise|true     |
|Bla          |20       |noise|true     |
|Bla          |21       |ende |tru

In [88]:
df.select("machinenumber", "timestamp", "state", "start_ende", "start_ende_ffill", 
          "ffill_shift", "start_and_ffill_shift", "series", "l_start_l_ende").show(50, False)

+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|machinenumber|timestamp|state|start_ende|start_ende_ffill|ffill_shift|start_and_ffill_shift|series|l_start_l_ende|
+-------------+---------+-----+----------+----------------+-----------+---------------------+------+--------------+
|Bla          |1        |ende |0         |0               |1          |0                    |0     |0             |
|Bla          |2        |noise|null      |0               |0          |0                    |0     |0             |
|Bla          |3        |start|1         |1               |0          |1                    |1     |1             |
|Bla          |4        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |5        |noise|null      |1               |1          |0                    |1     |1             |
|Bla          |6        |ende |0         |0               |1          |0

### First start, First ende

In [177]:
# bitwise AND: ende & ende_shift+
df = df.withColumn("ser_f_start_f_ende", df.ser.cast(IntegerType()))

In [178]:
df = df.withColumn("ser_f_start_f_ende", F.when(df.ende.bitwiseAND(df.ende_shift) == 1, 0)\
                                          .otherwise(df.ser_f_start_f_ende.cast(IntegerType())))

In [213]:
df.select("machinenumber", "timestamp", "state", "ser_f_start_f_ende", "start", "ende", "ende_shift").show(20, False)

+-------------+---------+-----+------------------+-----+----+----------+
|machinenumber|timestamp|state|ser_f_start_f_ende|start|ende|ende_shift|
+-------------+---------+-----+------------------+-----+----+----------+
|Bla          |1        |ende |0                 |0    |1   |1         |
|Bla          |2        |start|1                 |1    |0   |1         |
|Bla          |3        |ende |1                 |0    |1   |0         |
|Bla          |4        |start|2                 |1    |0   |1         |
|Bla          |5        |start|2                 |1    |0   |0         |
|Bla          |6        |start|2                 |1    |0   |0         |
|Bla          |7        |ende |2                 |0    |1   |0         |
|Bla          |8        |start|3                 |1    |0   |1         |
|Bla          |9        |ende |3                 |0    |1   |0         |
|Bla          |10       |start|4                 |1    |0   |1         |
|Bla          |11       |start|4                 |1

In [181]:
df = df.withColumn("ser", df.ser.cast(IntegerType()))

In [182]:
df.dtypes

[('machinenumber', 'string'),
 ('timestamp', 'int'),
 ('state', 'string'),
 ('start', 'int'),
 ('ende', 'int'),
 ('ende_shift', 'int'),
 ('start_ende', 'int'),
 ('start_ende_shift', 'int'),
 ('ser', 'int'),
 ('ser_f_start_f_ende', 'int')]

### Last start, last ende

In [200]:
# shift start -1
df = df.withColumn("start_shift-", F.lag(df.start, default=0, count=-1).over(w_lag).cast(IntegerType()))\
       .withColumn("ser_l_start_l_ende", df.ser.cast(IntegerType()))

In [201]:
df = df.withColumn("ser_l_start_l_ende", F.when((F.col("start") == 1) & \
                                                (F.col("start_shift-") == 1), 0)\
                                          .otherwise(df.ser.cast(IntegerType())))

In [207]:
df.select("machinenumber", "timestamp", "state", "ser_l_start_l_ende").show(20, False)

+-------------+---------+-----+------------------+
|machinenumber|timestamp|state|ser_l_start_l_ende|
+-------------+---------+-----+------------------+
|Bla          |1        |ende |0                 |
|Bla          |2        |start|1                 |
|Bla          |3        |ende |1                 |
|Bla          |4        |start|0                 |
|Bla          |5        |start|0                 |
|Bla          |6        |start|2                 |
|Bla          |7        |ende |2                 |
|Bla          |8        |start|3                 |
|Bla          |9        |ende |3                 |
|Bla          |10       |start|0                 |
|Bla          |11       |start|4                 |
|Bla          |12       |ende |4                 |
|Bla          |13       |ende |4                 |
|Bla          |14       |start|5                 |
|Bla          |15       |ende |5                 |
|Bla          |16       |ende |5                 |
|Bla          |17       |ende |

### Last Start , First ende

In [203]:
df = df.withColumn("ser_l_start_f_ende", df.ser.cast(IntegerType()))

In [204]:
df = df.withColumn("ser_l_start_f_ende", F.when((F.col("start") == 1) & \
                                                (F.col("start_shift-") == 1), 0)\
                                          .when((F.col("ende") == 1) & \
                                                (F.col("ende_shift") == 1), 0)\
                                          .otherwise(df.ser.cast(IntegerType())))

In [208]:
df.select("machinenumber", "timestamp", "state", "ser_l_start_f_ende").show(20, False)

+-------------+---------+-----+------------------+
|machinenumber|timestamp|state|ser_l_start_f_ende|
+-------------+---------+-----+------------------+
|Bla          |1        |ende |0                 |
|Bla          |2        |start|1                 |
|Bla          |3        |ende |1                 |
|Bla          |4        |start|0                 |
|Bla          |5        |start|0                 |
|Bla          |6        |start|2                 |
|Bla          |7        |ende |2                 |
|Bla          |8        |start|3                 |
|Bla          |9        |ende |3                 |
|Bla          |10       |start|0                 |
|Bla          |11       |start|4                 |
|Bla          |12       |ende |4                 |
|Bla          |13       |ende |0                 |
|Bla          |14       |start|5                 |
|Bla          |15       |ende |5                 |
|Bla          |16       |ende |0                 |
|Bla          |17       |ende |

In [209]:
df_seq.select("machinenumber", "timestamp", "iids").show(20, False)

+-------------+---------+----+
|machinenumber|timestamp|iids|
+-------------+---------+----+
|Bla          |1        |0   |
|Bla          |2        |1   |
|Bla          |3        |1   |
|Bla          |4        |0   |
|Bla          |5        |0   |
|Bla          |6        |2   |
|Bla          |7        |2   |
|Bla          |8        |3   |
|Bla          |9        |3   |
|Bla          |10       |0   |
|Bla          |11       |4   |
|Bla          |12       |4   |
|Bla          |13       |0   |
|Bla          |14       |5   |
|Bla          |15       |5   |
|Bla          |16       |0   |
|Bla          |17       |0   |
|Bla          |18       |6   |
|Bla          |19       |6   |
+-------------+---------+----+

