In [0]:
data = [
    (1, '2011-01-01', 500), (1, '2011-01-15', 50), (1, '2011-01-22', 250),
    (1, '2011-01-24', 75), (1, '2011-01-26', 125), (1, '2011-01-28', 175),
    (2, '2011-01-01', 500), (2, '2011-01-15', 50), (2, '2011-01-22', 25),
    (2, '2011-01-23', 125), (2, '2011-01-26', 200), (2, '2011-01-29', 250),
    (3, '2011-01-01', 500), (3, '2011-01-15', 50), (3, '2011-01-22', 5000),
    (3, '2011-01-25', 550), (3, '2011-01-27', 95), (3, '2011-01-30', 2500)
]

df = spark.createDataFrame(data=data, schema = ['RowID','FName','Salary'])
df.show()

+-----+----------+------+
|RowID|     FName|Salary|
+-----+----------+------+
|    1|2011-01-01|   500|
|    1|2011-01-15|    50|
|    1|2011-01-22|   250|
|    1|2011-01-24|    75|
|    1|2011-01-26|   125|
|    1|2011-01-28|   175|
|    2|2011-01-01|   500|
|    2|2011-01-15|    50|
|    2|2011-01-22|    25|
|    2|2011-01-23|   125|
|    2|2011-01-26|   200|
|    2|2011-01-29|   250|
|    3|2011-01-01|   500|
|    3|2011-01-15|    50|
|    3|2011-01-22|  5000|
|    3|2011-01-25|   550|
|    3|2011-01-27|    95|
|    3|2011-01-30|  2500|
+-----+----------+------+



In [0]:
logical_data = [
(1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000)
]

logical_df = spark.createDataFrame(logical_data, ["RowID", "FName", "Salary"])
display(logical_df)

RowID,FName,Salary
1,George,800
2,Sam,950
3,Diane,1100
4,Nicholas,1250
5,Samuel,1250
6,Patricia,1300
7,Brian,1500
8,Thomas,1600
9,Fran,2450
10,Debbie,2850


In [0]:
# running total of all transactions
from pyspark.sql.window import Window
import pyspark.sql.functions as f

def add_running_total(df, id_col: str, date_col: str, price_col: str):
    window_spec = Window.partitionBy(id_col).orderBy(date_col).rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    return df.withColumn("RunTotalAmt", f.sum(price_col).over(window_spec))

df_with_total = add_running_total(df, "RowID", "FName", "Salary")
df_with_total.show()


    
    

+-----+----------+------+-----------+
|RowID|     FName|Salary|RunTotalAmt|
+-----+----------+------+-----------+
|    1|2011-01-01|   500|        500|
|    1|2011-01-15|    50|        550|
|    1|2011-01-22|   250|        800|
|    1|2011-01-24|    75|        875|
|    1|2011-01-26|   125|       1000|
|    1|2011-01-28|   175|       1175|
|    2|2011-01-01|   500|        500|
|    2|2011-01-15|    50|        550|
|    2|2011-01-22|    25|        575|
|    2|2011-01-23|   125|        700|
|    2|2011-01-26|   200|        900|
|    2|2011-01-29|   250|       1150|
|    3|2011-01-01|   500|        500|
|    3|2011-01-15|    50|        550|
|    3|2011-01-22|  5000|       5550|
|    3|2011-01-25|   550|       6100|
|    3|2011-01-27|    95|       6195|
|    3|2011-01-30|  2500|       8695|
+-----+----------+------+-----------+



In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f

def fun1(df, id_col: str, date_col: str, trans: str):
    window = Window.partitionBy(id_col).orderBy(date_col)

    return df.withColumn("RunAvg", f.avg(trans).over(window))\
        .withColumn("RunTranQty", f.count("*").over(window))\
        .withColumn("RunMinAmt", f.min(trans).over(window))\
        .withColumn("RunMaxAmt", f.max(trans).over(window))\
        .withColumn("RunTotalAmt", f.sum(trans).over(window))\
        .orderBy(id_col, date_col) 

df_with_total = fun1(df, "RowID", "FName", "Salary")
df_with_total.show()


+-----+----------+------+------------------+----------+---------+---------+-----------+
|RowID|     FName|Salary|            RunAvg|RunTranQty|RunMinAmt|RunMaxAmt|RunTotalAmt|
+-----+----------+------+------------------+----------+---------+---------+-----------+
|    1|2011-01-01|   500|             500.0|         1|      500|      500|        500|
|    1|2011-01-15|    50|             275.0|         2|       50|      500|        550|
|    1|2011-01-22|   250| 266.6666666666667|         3|       50|      500|        800|
|    1|2011-01-24|    75|            218.75|         4|       50|      500|        875|
|    1|2011-01-26|   125|             200.0|         5|       50|      500|       1000|
|    1|2011-01-28|   175|195.83333333333334|         6|       50|      500|       1175|
|    2|2011-01-01|   500|             500.0|         1|      500|      500|        500|
|    2|2011-01-15|    50|             275.0|         2|       50|      500|        550|
|    2|2011-01-22|    25|191.666

In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f


def fun2(df, id_col: str, date_col: str, trans: str):
    window = Window.partitionBy(id_col).orderBy(date_col)
    window2=Window.partitionBy(id_col).orderBy(date_col).rowsBetween(-2, 0)


    return df.withColumn("RunAvg", f.avg(trans).over(window2))\
        .withColumn("SlideQty", f.count("*").over(window2))\
        .withColumn("SlideMin", f.min(trans).over(window2))\
        .withColumn("SlideMax", f.max(trans).over(window2))\
        .withColumn("RowNumber", f.sum(trans).over(window))\
        .orderBy([id_col, date_col,"RowNumber"] ) 

df_with_total = fun2(df, "RowID", "FName", "Salary")
df_with_total.show()


+-----+----------+------+------------------+--------+--------+--------+---------+
|RowID|     FName|Salary|            RunAvg|SlideQty|SlideMin|SlideMax|RowNumber|
+-----+----------+------+------------------+--------+--------+--------+---------+
|    1|2011-01-01|   500|             500.0|       1|     500|     500|      500|
|    1|2011-01-15|    50|             275.0|       2|      50|     500|      550|
|    1|2011-01-22|   250| 266.6666666666667|       3|      50|     500|      800|
|    1|2011-01-24|    75|             125.0|       3|      50|     250|      875|
|    1|2011-01-26|   125|             150.0|       3|      75|     250|     1000|
|    1|2011-01-28|   175|             125.0|       3|      75|     175|     1175|
|    2|2011-01-01|   500|             500.0|       1|     500|     500|      500|
|    2|2011-01-15|    50|             275.0|       2|      50|     500|      550|
|    2|2011-01-22|    25|191.66666666666666|       3|      25|     500|      575|
|    2|2011-01-2

In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f

def fun3(df, id_col: str, name_col: str, salary: str):
    window = Window.orderBy('Salary').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    window2=Window.orderBy('Salary').rangeBetween(Window.unboundedPreceding, Window.currentRow)

    return df.withColumn("SumByRows", f.sum(salary).over(window))\
    .withColumn("SumByRange", f.sum(salary).over(window2))\
    .orderBy(id_col)

df_with_total = fun3(logical_df, "RowID", "FName", "Salary")
df_with_total.show()

+-----+-----------+------+---------+----------+
|RowID|      FName|Salary|SumByRows|SumByRange|
+-----+-----------+------+---------+----------+
|    1|     George|   800|      800|       800|
|    2|        Sam|   950|     1750|      1750|
|    3|      Diane|  1100|     2850|      2850|
|    4|   Nicholas|  1250|     4100|      5350|
|    5|     Samuel|  1250|     5350|      5350|
|    6|   Patricia|  1300|     6650|      6650|
|    7|      Brian|  1500|     8150|      8150|
|    8|     Thomas|  1600|     9750|      9750|
|    9|       Fran|  2450|    12200|     12200|
|   10|     Debbie|  2850|    15050|     15050|
|   11|       Mark|  2975|    18025|     18025|
|   12|      James|  3000|    21025|     24025|
|   13|    Cynthia|  3000|    24025|     24025|
|   14|Christopher|  5000|    29025|     29025|
+-----+-----------+------+---------+----------+

