### Zadanie 3
Wykorzystaj przykłady z notatnika w SQL Windowed Aggregate Functions (cmd 11) i przepisz funkcje używając Spark API.

In [0]:
from pyspark.sql.functions import col, sum as _sum, avg, count, min as _min, max as _max, row_number, lead, lag, first, last
from pyspark.sql.window import Window

In [0]:
transactions_data = [
    (1, '2011-01-01', 500),
    (1, '2011-01-15', 50),
    (1, '2011-01-22', 250),
    (1, '2011-01-24', 75),
    (1, '2011-01-26', 125),
    (1, '2011-01-28', 175),
    (2, '2011-01-01', 500),
    (2, '2011-01-15', 50),
    (2, '2011-01-22', 25),
    (2, '2011-01-23', 125),
    (2, '2011-01-26', 200),
    (2, '2011-01-29', 250),
    (3, '2011-01-01', 500),
    (3, '2011-01-15', 50),
    (3, '2011-01-22', 5000),
    (3, '2011-01-25', 550),
    (3, '2011-01-27', 95),
    (3, '2011-01-30', 2500)
]

logical_data = [
    (1, 'George', 800),
    (2, 'Sam', 950),
    (3, 'Diane', 1100),
    (4, 'Nicholas', 1250),
    (5, 'Samuel', 1250),
    (6, 'Patricia', 1300),
    (7, 'Brian', 1500),
    (8, 'Thomas', 1600),
    (9, 'Fran', 2450),
    (10, 'Debbie', 2850),
    (11, 'Mark', 2975),
    (12, 'James', 3000),
    (13, 'Cynthia', 3000),
    (14, 'Christopher', 5000)
]

columns_transactions = ['AccountId', 'TranDate', 'TranAmt']
columns_logical = ['RowID', 'FName', 'Salary']

# Tworzenie DataFrame
transactions_df = spark.createDataFrame(transactions_data, columns_transactions)
logical_df = spark.createDataFrame(logical_data, columns_logical)

In [0]:
display(transactions_df)

AccountId,TranDate,TranAmt
1,2011-01-01,500
1,2011-01-15,50
1,2011-01-22,250
1,2011-01-24,75
1,2011-01-26,125
1,2011-01-28,175
2,2011-01-01,500
2,2011-01-15,50
2,2011-01-22,25
2,2011-01-23,125


In [0]:
display(logical_df)

RowID,FName,Salary
1,George,800
2,Sam,950
3,Diane,1100
4,Nicholas,1250
5,Samuel,1250
6,Patricia,1300
7,Brian,1500
8,Thomas,1600
9,Fran,2450
10,Debbie,2850


In [0]:
# Okno na podstawie AccountId i posortowane według TranDate
windowSpec = Window.partitionBy("AccountId").orderBy("TranDate")

transactions_with_window = transactions_df.withColumn(
    "RunTotalAmt", _sum("TranAmt").over(windowSpec)
)

display(transactions_with_window)

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500,500
1,2011-01-15,50,550
1,2011-01-22,250,800
1,2011-01-24,75,875
1,2011-01-26,125,1000
1,2011-01-28,175,1175
2,2011-01-01,500,500
2,2011-01-15,50,550
2,2011-01-22,25,575
2,2011-01-23,125,700


In [0]:
transactions_with_window = transactions_df.withColumn(
    "RunAvg", avg("TranAmt").over(windowSpec)
).withColumn(
    "RunTranQty", count("*").over(windowSpec)
).withColumn(
    "RunSmallAmt", _min("TranAmt").over(windowSpec)
).withColumn(
    "RunLargeAmt", _max("TranAmt").over(windowSpec)
).withColumn(
    "RunTotalAmt", _sum("TranAmt").over(windowSpec)
)

display(transactions_with_window)

AccountId,TranDate,TranAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt,RunTotalAmt
1,2011-01-01,500,500.0,1,500,500,500
1,2011-01-15,50,275.0,2,50,500,550
1,2011-01-22,250,266.6666666666667,3,50,500,800
1,2011-01-24,75,218.75,4,50,500,875
1,2011-01-26,125,200.0,5,50,500,1000
1,2011-01-28,175,195.83333333333331,6,50,500,1175
2,2011-01-01,500,500.0,1,500,500,500
2,2011-01-15,50,275.0,2,50,500,550
2,2011-01-22,25,191.66666666666663,3,25,500,575
2,2011-01-23,125,175.0,4,25,500,700


In [0]:
# Okno z przesunięciem na 2 poprzednie wiersze
windowSpecSlide = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(-2, 0)

transactions_with_slide = transactions_df.withColumn(
    "SlideAvg", avg("TranAmt").over(windowSpecSlide)
).withColumn(
    "SlideQty", count("*").over(windowSpecSlide)
).withColumn(
    "SlideMin", _min("TranAmt").over(windowSpecSlide)
).withColumn(
    "SlideMax", _max("TranAmt").over(windowSpecSlide)
).withColumn(
    "SlideTotal", _sum("TranAmt").over(windowSpecSlide)
).withColumn(
    "RN", row_number().over(windowSpec)
)

display(transactions_with_slide)

AccountId,TranDate,TranAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal,RN
1,2011-01-01,500,500.0,1,500,500,500,1
1,2011-01-15,50,275.0,2,50,500,550,2
1,2011-01-22,250,266.6666666666667,3,50,500,800,3
1,2011-01-24,75,125.0,3,50,250,375,4
1,2011-01-26,125,150.0,3,75,250,450,5
1,2011-01-28,175,125.0,3,75,175,375,6
2,2011-01-01,500,500.0,1,500,500,500,1
2,2011-01-15,50,275.0,2,50,500,550,2
2,2011-01-22,25,191.66666666666663,3,25,500,575,3
2,2011-01-23,125,66.66666666666667,3,25,125,200,4


In [0]:
# Okno dla kolumn Salary
windowSpecLogical = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, 0)

logical_with_window = logical_df.withColumn(
    "SumByRows", _sum("Salary").over(windowSpecLogical)
).withColumn(
    "SumByRange", _sum("Salary").over(Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, 0))
)

display(logical_with_window)

RowID,FName,Salary,SumByRows,SumByRange
1,George,800,800,800
2,Sam,950,1750,1750
3,Diane,1100,2850,2850
4,Nicholas,1250,4100,5350
5,Samuel,1250,5350,5350
6,Patricia,1300,6650,6650
7,Brian,1500,8150,8150
8,Thomas,1600,9750,9750
9,Fran,2450,12200,12200
10,Debbie,2850,15050,15050


### Zadanie 4
Do tego notatnika dopisz użycie funkcji okienkowych LEAD, LAG, FIRST_VALUE, LAST_VALUE, ROW_NUMBER 

In [0]:
# Okno na podstawie AccountId i posortowane według TranDate
windowSpec = Window.partitionBy("AccountId").orderBy("TranDate")

# Obliczenia z użyciem funkcji okienkowych
transactions_with_window_functions = transactions_df.withColumn(
    "NextTranAmt", lead("TranAmt", 1).over(windowSpec)  # LEAD - Umożliwia uzyskanie wartości w następnym wierszu.
).withColumn(
    "PrevTranAmt", lag("TranAmt", 1).over(windowSpec)  # LAG - Umożliwia uzyskanie wartości w poprzednim wierszu.
).withColumn(
    "FirstTranAmt", first("TranAmt").over(windowSpec)  # FIRST_VALUE - Zwraca pierwszą wartość w obrębie okna.
).withColumn(
    "LastTranAmt", last("TranAmt").over(windowSpec)  # LAST_VALUE - Zwraca ostatnią wartość w obrębie okna.
).withColumn(
    "RowNum", row_number().over(windowSpec)  # ROW_NUMBER - Numeruje wiersze w obrębie okna.
)

display(transactions_with_window_functions)


AccountId,TranDate,TranAmt,NextTranAmt,PrevTranAmt,FirstTranAmt,LastTranAmt,RowNum
1,2011-01-01,500,50.0,,500,500,1
1,2011-01-15,50,250.0,500.0,500,50,2
1,2011-01-22,250,75.0,50.0,500,250,3
1,2011-01-24,75,125.0,250.0,500,75,4
1,2011-01-26,125,175.0,75.0,500,125,5
1,2011-01-28,175,,125.0,500,175,6
2,2011-01-01,500,50.0,,500,500,1
2,2011-01-15,50,25.0,500.0,500,50,2
2,2011-01-22,25,125.0,50.0,500,25,3
2,2011-01-23,125,200.0,25.0,500,125,4
