In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Window-example").getOrCreate()

In [2]:
data = [(1, "A", 10), (1, "B", 20), (2, "A", 30), (2, "B", 40), (2, "C", 50)]
columns = ["partition_col", "order_col", "value"]
df = spark.createDataFrame(data, columns)
df.show()

+-------------+---------+-----+
|partition_col|order_col|value|
+-------------+---------+-----+
|            1|        A|   10|
|            1|        B|   20|
|            2|        A|   30|
|            2|        B|   40|
|            2|        C|   50|
+-------------+---------+-----+



In [3]:
# Define a window specification
window_spec = (
    Window
    .partitionBy("partition_col") # 각 파티션의 데이터를
    .orderBy("order_col") # order_col의 순서대로
)
window_spec

<pyspark.sql.window.WindowSpec at 0x104fbdd30>

In [4]:
# Calculate running total within each partition
result = df.withColumn("running_total", F.sum("value").over(window_spec))

result.show()

+-------------+---------+-----+-------------+
|partition_col|order_col|value|running_total|
+-------------+---------+-----+-------------+
|            1|        A|   10|           10|
|            1|        B|   20|           30|
|            2|        A|   30|           30|
|            2|        B|   40|           70|
|            2|        C|   50|          120|
+-------------+---------+-----+-------------+



In [9]:
df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"), 
  ("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),   
  ("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"), 
  ("thabo", 20.00, "2018-03-13T15:27:18+00:00"), 
  ("thabo", 56.00, "2018-03-14T12:27:18+00:00"), 
  ("thabo", 99.00, "2018-03-15T11:27:18+00:00"), 
  ("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"), 
  ("thabo", 122.00, "2018-03-31T11:27:18+00:00"), 
  ("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
  ("ash", 9999.00, "2018-04-16T11:27:18+00:00") 
  ],
  ["name", "dollars", "timestampGMT"])

# we need this timestampGMT as seconds for our Window time frame
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

df.show(20)

+---------+-------+-------------------+
|     name|dollars|       timestampGMT|
+---------+-------+-------------------+
|tshilidzi|   17.0|2018-03-10 16:27:18|
|tshilidzi|   13.0|2018-03-11 13:27:18|
|tshilidzi|   25.0|2018-03-12 12:27:18|
|    thabo|   20.0|2018-03-13 16:27:18|
|    thabo|   56.0|2018-03-14 13:27:18|
|    thabo|   99.0|2018-03-15 12:27:18|
|tshilidzi|  156.0|2019-03-22 12:27:18|
|    thabo|  122.0|2018-03-31 13:27:18|
|tshilidzi| 7000.0|2019-04-15 13:27:18|
|      ash| 9999.0|2018-04-16 13:27:18|
+---------+-------+-------------------+



In [18]:
from pyspark.sql import Window

days = lambda i: i * 86400
window = (
    Window
    .partitionBy("name") 
    .orderBy(F.col("timestampGMT").cast('long'))
    .rangeBetween(-days(2), 0) # 이름별로, 그 row 포함 ~ 그 전 2일까지
)

In [20]:
df = df.withColumn('sum_two_days', F.sum('dollars').over(window))
df.show()

+---------+-------+-------------------+------------+
|     name|dollars|       timestampGMT|sum_two_days|
+---------+-------+-------------------+------------+
|      ash| 9999.0|2018-04-16 13:27:18|      9999.0|
|    thabo|   20.0|2018-03-13 16:27:18|        20.0|
|    thabo|   56.0|2018-03-14 13:27:18|        76.0|
|    thabo|   99.0|2018-03-15 12:27:18|       175.0|
|    thabo|  122.0|2018-03-31 13:27:18|       122.0|
|tshilidzi|   17.0|2018-03-10 16:27:18|        17.0|
|tshilidzi|   13.0|2018-03-11 13:27:18|        30.0|
|tshilidzi|   25.0|2018-03-12 12:27:18|        55.0|
|tshilidzi|  156.0|2019-03-22 12:27:18|       156.0|
|tshilidzi| 7000.0|2019-04-15 13:27:18|      7000.0|
+---------+-------+-------------------+------------+

