In [22]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f, Window as W

_data = [
    [1, "GS3", 1],
    [2, "GS2", 1],
    [3, "GS2", 8],
    [4, "GS1", 1],
    [5, "GS2", 2],
    [6, "ABC", 0],
    [7, "B123", 0],
    [8, "B423", 0],
    [9, "PTSD", 168],
    [10, "XCD", 0]
]
_cols = ["ID", "sourceName", "eventData"]
spark = SparkSession.builder \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

df = spark.createDataFrame(data=_data, schema=_cols)
df.show()

+---+----------+---------+
| ID|sourceName|eventData|
+---+----------+---------+
|  1|       GS3|        1|
|  2|       GS2|        1|
|  3|       GS2|        8|
|  4|       GS1|        1|
|  5|       GS2|        2|
|  6|       ABC|        0|
|  7|      B123|        0|
|  8|      B423|        0|
|  9|      PTSD|      168|
| 10|       XCD|        0|
+---+----------+---------+



In [23]:
df = df.withColumn("testValveOpened", f.when(f.col("sourceName") == "GS2", f.col("eventData")).otherwise(0))
df.show()

+---+----------+---------+---------------+
| ID|sourceName|eventData|testValveOpened|
+---+----------+---------+---------------+
|  1|       GS3|        1|              0|
|  2|       GS2|        1|              1|
|  3|       GS2|        8|              8|
|  4|       GS1|        1|              0|
|  5|       GS2|        2|              2|
|  6|       ABC|        0|              0|
|  7|      B123|        0|              0|
|  8|      B423|        0|              0|
|  9|      PTSD|      168|              0|
| 10|       XCD|        0|              0|
+---+----------+---------+---------------+



In [24]:
window = W.orderBy("ID")
df = df.withColumn("testValveOpened", f.sum("testValveOpened").over(window))
df.show()

+---+----------+---------+---------------+
| ID|sourceName|eventData|testValveOpened|
+---+----------+---------+---------------+
|  1|       GS3|        1|              0|
|  2|       GS2|        1|              1|
|  3|       GS2|        8|              9|
|  4|       GS1|        1|              9|
|  5|       GS2|        2|             11|
|  6|       ABC|        0|             11|
|  7|      B123|        0|             11|
|  8|      B423|        0|             11|
|  9|      PTSD|      168|             11|
| 10|       XCD|        0|             11|
+---+----------+---------+---------------+



In [27]:
df = df.withColumn("testValveOpened", f.first("eventData").over(window.partitionBy("testValveOpened").orderBy("ID")))
df.show()

+---+----------+---------+---------------+
| ID|sourceName|eventData|testValveOpened|
+---+----------+---------+---------------+
|  1|       GS3|        1|              1|
|  2|       GS2|        1|              1|
|  3|       GS2|        8|              8|
|  4|       GS1|        1|              8|
|  5|       GS2|        2|              2|
|  6|       ABC|        0|              2|
|  7|      B123|        0|              2|
|  8|      B423|        0|              2|
|  9|      PTSD|      168|              2|
| 10|       XCD|        0|              2|
+---+----------+---------+---------------+

