Wykorzystaj przykłady z notatnika w SQL Windowed Aggregate Functions (cmd 11) i przepisz funkcje używając Spark API. 


In [0]:
from pyspark.sql.functions import *

In [0]:
df1 = spark.createDataFrame([
( 1, '2011-01-01', 500),
( 1, '2011-01-15', 50),
( 1, '2011-01-22', 250),
( 1, '2011-01-24', 75),
( 1, '2011-01-26', 125),
( 1, '2011-01-28', 175),
( 2, '2011-01-01', 500),
( 2, '2011-01-15', 50),
( 2, '2011-01-22', 25),
( 2, '2011-01-23', 125),
( 2, '2011-01-26', 200),
( 2, '2011-01-29', 250),
( 3, '2011-01-01', 500),
( 3, '2011-01-15', 50 ),
( 3, '2011-01-22', 5000),
( 3, '2011-01-25', 550),
( 3, '2011-01-27', 95 ),
( 3, '2011-01-30', 2500)], ["AccountId", "TranDate", "TranAmt"])

df2 = spark.createDataFrame([
(1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000)], ["RowID", "FName", "Salary"])
     

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("AccountId").orderBy("TranDate")

df1_with_total = df1.withColumn("RunTotalAmt", sum("TranAmt").over(window_spec))

display(df1_with_total)

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500,500
1,2011-01-15,50,550
1,2011-01-22,250,800
1,2011-01-24,75,875
1,2011-01-26,125,1000
1,2011-01-28,175,1175
2,2011-01-01,500,500
2,2011-01-15,50,550
2,2011-01-22,25,575
2,2011-01-23,125,700


In [0]:
window_spec = Window.partitionBy("AccountId").orderBy("TranDate")


df_result = df1 \
    .withColumn("RunAvg", avg("TranAmt").over(window_spec)).withColumn("RunTranQty", count("*").over(window_spec)) \
    .withColumn("RunSmallAmt", min("TranAmt").over(window_spec)).withColumn("RunLargeAmt", max("TranAmt").over(window_spec)) \
    .withColumn("RunTotalAmt", sum("TranAmt").over(window_spec)).orderBy("AccountId", "TranDate")

display(df_result)

AccountId,TranDate,TranAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt,RunTotalAmt
1,2011-01-01,500,500.0,1,500,500,500
1,2011-01-15,50,275.0,2,50,500,550
1,2011-01-22,250,266.6666666666667,3,50,500,800
1,2011-01-24,75,218.75,4,50,500,875
1,2011-01-26,125,200.0,5,50,500,1000
1,2011-01-28,175,195.83333333333331,6,50,500,1175
2,2011-01-01,500,500.0,1,500,500,500
2,2011-01-15,50,275.0,2,50,500,550
2,2011-01-22,25,191.66666666666663,3,25,500,575
2,2011-01-23,125,175.0,4,25,500,700


In [0]:
sliding_window = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(-2, 0)

rownum_window = Window.partitionBy("AccountId").orderBy("TranDate")

df_sliding = df1.withColumn("SlideAvg", avg("TranAmt").over(sliding_window)).withColumn("SlideQty", count("*").over(sliding_window)) \
    .withColumn("SlideMin", min("TranAmt").over(sliding_window)).withColumn("SlideMax", max("TranAmt").over(sliding_window)) \
    .withColumn("SlideTotal", sum("TranAmt").over(sliding_window)).withColumn("RN", row_number().over(rownum_window)).orderBy("AccountId", "TranDate", "RN")

display(df_sliding)

AccountId,TranDate,TranAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal,RN
1,2011-01-01,500,500.0,1,500,500,500,1
1,2011-01-15,50,275.0,2,50,500,550,2
1,2011-01-22,250,266.6666666666667,3,50,500,800,3
1,2011-01-24,75,125.0,3,50,250,375,4
1,2011-01-26,125,150.0,3,75,250,450,5
1,2011-01-28,175,125.0,3,75,175,375,6
2,2011-01-01,500,500.0,1,500,500,500,1
2,2011-01-15,50,275.0,2,50,500,550,2
2,2011-01-22,25,191.66666666666663,3,25,500,575,3
2,2011-01-23,125,66.66666666666667,3,25,125,200,4


In [0]:
rows_window = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

range_window = Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)

df_range_rows = df2.withColumn("SumByRows", sum("Salary").over(rows_window)).withColumn("SumByRange", sum("Salary").over(range_window)).orderBy("RowID")

display(df_range_rows.select("RowID", "FName", "Salary", "SumByRows", "SumByRange"))

RowID,FName,Salary,SumByRows,SumByRange
1,George,800,800,800
2,Sam,950,1750,1750
3,Diane,1100,2850,2850
4,Nicholas,1250,4100,5350
5,Samuel,1250,5350,5350
6,Patricia,1300,6650,6650
7,Brian,1500,8150,8150
8,Thomas,1600,9750,9750
9,Fran,2450,12200,12200
10,Debbie,2850,15050,15050


Użycie funkcji okienkowych LEAD, LAG, FIRST_VALUE, LAST_VALUE, ROW_NUMBER 

In [0]:
win = Window.partitionBy("AccountId").orderBy("TranDate")

df_windowed = (
    df1
    # pobiera wartość TranAmt z poprzedniego wiersza
    .withColumn("PrevTranAmt", lag("TranAmt", 1).over(win))
    
    # pobiera wartość TranAmt z następnego wiersza
    .withColumn("NextTranAmt", lead("TranAmt", 1).over(win))
    
    # pokazuje pierwszą kwotę transakcji dla danego konta
    .withColumn("FirstTranAmt", first("TranAmt").over(win))
    
    # pokazuje ostatnią kwotę transakcji do tego momentu dla danego konta
    .withColumn("LastTranAmt", last("TranAmt").over(win))
    
    # nadaje numer porządkowy każdej transakcji
    .withColumn("RowNum", row_number().over(win))
)
display(df_windowed)

AccountId,TranDate,TranAmt,PrevTranAmt,NextTranAmt,FirstTranAmt,LastTranAmt,RowNum
1,2011-01-01,500,,50.0,500,500,1
1,2011-01-15,50,500.0,250.0,500,50,2
1,2011-01-22,250,50.0,75.0,500,250,3
1,2011-01-24,75,250.0,125.0,500,75,4
1,2011-01-26,125,75.0,175.0,500,125,5
1,2011-01-28,175,125.0,,500,175,6
2,2011-01-01,500,,50.0,500,500,1
2,2011-01-15,50,500.0,25.0,500,50,2
2,2011-01-22,25,50.0,125.0,500,25,3
2,2011-01-23,125,25.0,200.0,500,125,4
