# Zadanie 3

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, min, max, count
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

data = [
    (1, "2011-01-01", 500),
    (1, "2011-01-15", 50),
    (1, "2011-01-22", 250),
    (1, "2011-01-24", 75),
    (1, "2011-01-26", 125),
    (1, "2011-01-28", 175),
    (2, "2011-01-01", 500),
    (2, "2011-01-15", 50),
    (2, "2011-01-22", 25),
    (2, "2011-01-23", 125),
    (2, "2011-01-26", 200),
    (2, "2011-01-29", 250),
    (3, "2011-01-01", 500),
    (3, "2011-01-15", 50),
    (3, "2011-01-22", 5000),
    (3, "2011-01-25", 550),
    (3, "2011-01-27", 95),
    (3, "2011-01-30", 2500),
]
columns = ["AccountId", "TranDate", "TranAmt"]
df = spark.createDataFrame(data, columns)

from pyspark.sql.functions import to_date
df = df.withColumn("TranDate", to_date(col("TranDate"), "yyyy-MM-dd"))

display(df)



AccountId,TranDate,TranAmt
1,2011-01-01,500
1,2011-01-15,50
1,2011-01-22,250
1,2011-01-24,75
1,2011-01-26,125
1,2011-01-28,175
2,2011-01-01,500
2,2011-01-15,50
2,2011-01-22,25
2,2011-01-23,125


odpowiednik SQL:
`SUM(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunTotalAmt`


In [0]:
window_spec = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Obliczenie bieżącej sumy transakcji
df = df.withColumn("RunTotalAmt", sum("TranAmt").over(window_spec))

display(df)

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500,500
1,2011-01-15,50,550
1,2011-01-22,250,800
1,2011-01-24,75,875
1,2011-01-26,125,1000
1,2011-01-28,175,1175
2,2011-01-01,500,500
2,2011-01-15,50,550
2,2011-01-22,25,575
2,2011-01-23,125,700


Funkcje agregujące

SQL: 
`AVG(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunAvg`,

`COUNT(*) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunTranQty`,

`MIN(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunSmallAmt`,

`MAX(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunLargeAmt`

In [0]:
df1 = df.withColumn("RunAvg", avg("TranAmt").over(window_spec)) \
       .withColumn("RunTranQty", count("TranAmt").over(window_spec)) \
       .withColumn("RunSmallAmt", min("TranAmt").over(window_spec)) \
       .withColumn("RunLargeAmt", max("TranAmt").over(window_spec))

display(df1)


AccountId,TranDate,TranAmt,RunTotalAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt
1,2011-01-01,500,500,500.0,1,500,500
1,2011-01-15,50,550,275.0,2,50,500
1,2011-01-22,250,800,266.6666666666667,3,50,500
1,2011-01-24,75,875,218.75,4,50,500
1,2011-01-26,125,1000,200.0,5,50,500
1,2011-01-28,175,1175,195.83333333333331,6,50,500
2,2011-01-01,500,500,500.0,1,500,500
2,2011-01-15,50,550,275.0,2,50,500
2,2011-01-22,25,575,191.66666666666663,3,25,500
2,2011-01-23,125,700,175.0,4,25,500


Definiowanie okna przesuwnego 

SQL: `AVG(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideAvg`

`COUNT(*) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideQty,`

`MIN(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideMin,`

`MAX(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideMax,`

`SUM(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideTotal`


In [0]:

sliding_window_spec = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(-2, 0)

df2 = df.withColumn("SlideAvg", avg("TranAmt").over(sliding_window_spec)) \
       .withColumn("SlideQty", count("TranAmt").over(sliding_window_spec)) \
       .withColumn("SlideMin", min("TranAmt").over(sliding_window_spec)) \
       .withColumn("SlideMax", max("TranAmt").over(sliding_window_spec)) \
       .withColumn("SlideTotal", sum("TranAmt").over(sliding_window_spec))

display(df2)


AccountId,TranDate,TranAmt,RunTotalAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal
1,2011-01-01,500,500,500.0,1,500,500,500
1,2011-01-15,50,550,275.0,2,50,500,550
1,2011-01-22,250,800,266.6666666666667,3,50,500,800
1,2011-01-24,75,875,125.0,3,50,250,375
1,2011-01-26,125,1000,150.0,3,75,250,450
1,2011-01-28,175,1175,125.0,3,75,175,375
2,2011-01-01,500,500,500.0,1,500,500,500
2,2011-01-15,50,550,275.0,2,50,500,550
2,2011-01-22,25,575,191.66666666666663,3,25,500,575
2,2011-01-23,125,700,66.66666666666667,3,25,125,200


## Funkcje okienkowe

LEAD – pobiera wartość z następnego wiersza

LAG – pobiera wartość z poprzedniego wiersza



In [0]:
from pyspark.sql.functions import lead, lag

window_spec_no_frame = Window.partitionBy("AccountId").orderBy("TranDate")

dfL = df.withColumn("NextTranAmt", lead("TranAmt").over(window_spec_no_frame)) \
        .withColumn("PrevTranAmt", lag("TranAmt").over(window_spec_no_frame))

display(dfL.select("TranAmt", "NextTranAmt", "PrevTranAmt"))

TranAmt,NextTranAmt,PrevTranAmt
500,50.0,
50,250.0,500.0
250,75.0,50.0
75,125.0,250.0
125,175.0,75.0
175,,125.0
500,50.0,
50,25.0,500.0
25,125.0,50.0
125,200.0,25.0



FIRST_VALUE – zwraca pierwszą wartość w oknie

LAST_VALUE – zwraca ostatnią wartość w oknie

In [0]:
from pyspark.sql.functions import first, last

dfV = df.withColumn("FirstTranAmt", first("TranAmt").over(window_spec)) \
       .withColumn("LastTranAmt", last("TranAmt").over(window_spec))

display(dfV.select("TranAmt", "FirstTranAmt", "LastTranAmt"))

TranAmt,FirstTranAmt,LastTranAmt
500,500,500
50,500,50
250,500,250
75,500,75
125,500,125
175,500,175
500,500,500
50,500,50
25,500,25
125,500,125


ROW_NUMBER – nadaje numer wiersza w oknie

In [0]:
from pyspark.sql.functions import row_number

dfRN = df.withColumn("RowNum", row_number().over(window_spec))

display(dfRN)

AccountId,TranDate,TranAmt,RunTotalAmt,RowNum
1,2011-01-01,500,500,1
1,2011-01-15,50,550,2
1,2011-01-22,250,800,3
1,2011-01-24,75,875,4
1,2011-01-26,125,1000,5
1,2011-01-28,175,1175,6
2,2011-01-01,500,500,1
2,2011-01-15,50,550,2
2,2011-01-22,25,575,3
2,2011-01-23,125,700,4
