# Zadanie 3

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, col

transactions = spark.createDataFrame([
    ( 1, '2011-01-01', 500),
    ( 1, '2011-01-15', 50),
    ( 1, '2011-01-22', 250),
    ( 1, '2011-01-24', 75),
    ( 1, '2011-01-26', 125),
    ( 1, '2011-01-28', 175),
    ( 2, '2011-01-01', 500),
    ( 2, '2011-01-15', 50),
    ( 2, '2011-01-22', 25),
    ( 2, '2011-01-23', 125),
    ( 2, '2011-01-26', 200),
    ( 2, '2011-01-29', 250),
    ( 3, '2011-01-01', 500),
    ( 3, '2011-01-15', 50 ),
    ( 3, '2011-01-22', 5000),
    ( 3, '2011-01-25', 550),
    ( 3, '2011-01-27', 95 ),
    ( 3, '2011-01-30', 2500)],
    ['AccountId', 'TranDate', 'TranAmt'])

transactions = transactions.withColumn("TranDate", to_date("TranDate"))\
    .withColumn("TranAmt", col("TranAmt").cast(DoubleType()))

display(transactions)

AccountId,TranDate,TranAmt
1,2011-01-01,500.0
1,2011-01-15,50.0
1,2011-01-22,250.0
1,2011-01-24,75.0
1,2011-01-26,125.0
1,2011-01-28,175.0
2,2011-01-01,500.0
2,2011-01-15,50.0
2,2011-01-22,25.0
2,2011-01-23,125.0


In [0]:
logical = spark.createDataFrame([
    (1,'George', 800),
    (2,'Sam', 950),
    (3,'Diane', 1100),
    (4,'Nicholas', 1250),
    (5,'Samuel', 1250),
    (6,'Patricia', 1300),
    (7,'Brian', 1500),
    (8,'Thomas', 1600),
    (9,'Fran', 2450),
    (10,'Debbie', 2850),
    (11,'Mark', 2975),
    (12,'James', 3000),
    (13,'Cynthia', 3000),
    (14,'Christopher', 5000)
], ["RowID", "FName", "Salary"])

logical = logical.withColumn("Salary", col("Salary").cast(ShortType()))

display(logical)

RowID,FName,Salary
1,George,800
2,Sam,950
3,Diane,1100
4,Nicholas,1250
5,Samuel,1250
6,Patricia,1300
7,Brian,1500
8,Thomas,1600
9,Fran,2450
10,Debbie,2850


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, count, min as spmin, max as spmax, sum as spsum, row_number

runTotAmtDf = transactions.select(
    "AccountId", "TranDate", "TranAmt",
    spsum("TranAmt").over(
        Window.partitionBy("AccountID").orderBy("TranDate")).alias("RunTotalAmt")
).orderBy(["AccountId", "TranDate"], ascending = [True, True])

display(runTotAmtDf)

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500.0,500.0
1,2011-01-15,50.0,550.0
1,2011-01-22,250.0,800.0
1,2011-01-24,75.0,875.0
1,2011-01-26,125.0,1000.0
1,2011-01-28,175.0,1175.0
2,2011-01-01,500.0,500.0
2,2011-01-15,50.0,550.0
2,2011-01-22,25.0,575.0
2,2011-01-23,125.0,700.0


In [0]:
win2_df = transactions.select(
    "AccountId", "TranDate", "TranAmt",
    
    avg("TranAmt").over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RunAvg"),
    
    count("*").over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RunTranQty"),

    spmin("TranAmt").over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RunSmallAmt"),

    spmax("TranAmt").over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RunLargeAmt"),

    spsum("TranAmt").over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RunTotalAmt")
).orderBy(['AccountId', 'TranDate'], ascending = [True, True])

display(win2_df)

AccountId,TranDate,TranAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt,RunTotalAmt
1,2011-01-01,500.0,500.0,1,500.0,500.0,500.0
1,2011-01-15,50.0,275.0,2,50.0,500.0,550.0
1,2011-01-22,250.0,266.6666666666667,3,50.0,500.0,800.0
1,2011-01-24,75.0,218.75,4,50.0,500.0,875.0
1,2011-01-26,125.0,200.0,5,50.0,500.0,1000.0
1,2011-01-28,175.0,195.83333333333331,6,50.0,500.0,1175.0
2,2011-01-01,500.0,500.0,1,500.0,500.0,500.0
2,2011-01-15,50.0,275.0,2,50.0,500.0,550.0
2,2011-01-22,25.0,191.66666666666663,3,25.0,500.0,575.0
2,2011-01-23,125.0,175.0,4,25.0,500.0,700.0


In [0]:
win_spec = Window.partitionBy("AccountId")\
    .orderBy("TranDate")\
    .rowsBetween(-2, Window.currentRow)

win3_df = transactions.select(
    "AccountId", "TranDate", "TranAmt",

    avg("TranAmt").over(win_spec).alias("SlideAvg"),

    count("TranAmt").over(win_spec).alias("SlideQty"),

    spmin("TranAmt").over(win_spec).alias("SlideMin"),

    spmax("TranAmt").over(win_spec).alias("SlideMax"),

    spsum("TranAmt").over(win_spec).alias("SlideTotal"),

    avg("TranAmt").over(win_spec).alias("SlideAvg"),

    row_number().over(Window.partitionBy("AccountId").orderBy("TranDate")).alias("RN")
).orderBy(["AccountId", "TranDate", "RN"], ascending = [True, True, True])

display(win3_df)

AccountId,TranDate,TranAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal,SlideAvg.1,RN
1,2011-01-01,500.0,500.0,1,500.0,500.0,500.0,500.0,1
1,2011-01-15,50.0,275.0,2,50.0,500.0,550.0,275.0,2
1,2011-01-22,250.0,266.6666666666667,3,50.0,500.0,800.0,266.6666666666667,3
1,2011-01-24,75.0,125.0,3,50.0,250.0,375.0,125.0,4
1,2011-01-26,125.0,150.0,3,75.0,250.0,450.0,150.0,5
1,2011-01-28,175.0,125.0,3,75.0,175.0,375.0,125.0,6
2,2011-01-01,500.0,500.0,1,500.0,500.0,500.0,500.0,1
2,2011-01-15,50.0,275.0,2,50.0,500.0,550.0,275.0,2
2,2011-01-22,25.0,191.66666666666663,3,25.0,500.0,575.0,191.66666666666663,3
2,2011-01-23,125.0,66.66666666666667,3,25.0,125.0,200.0,66.66666666666667,4


In [0]:
win4_df = logical.select(
    "RowID", "FName", "Salary",

    spsum("Salary").over(Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)).alias("SumByRows"),

    spsum("Salary").over(Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)).alias("SumByRange")
).orderBy("RowID")

display(win4_df)

RowID,FName,Salary,SumByRows,SumByRange
1,George,800,800,800
2,Sam,950,1750,1750
3,Diane,1100,2850,2850
4,Nicholas,1250,4100,5350
5,Samuel,1250,5350,5350
6,Patricia,1300,6650,6650
7,Brian,1500,8150,8150
8,Thomas,1600,9750,9750
9,Fran,2450,12200,12200
10,Debbie,2850,15050,15050


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, lag, row_number, first, last, col

# Okno partycjonowane po AccountId i posortowane po dacie
win_spec2 = Window.partitionBy("AccountId").orderBy("TranDate")

# Zastosowanie funkcji okienkowych
win5Df = transactions.select(
    "AccountId", "TranDate", "TranAmt",
    lead("TranAmt", 2, 0).over(win_spec2).alias("TwoLead"),
    lag("TranAmt", 1, 0).over(win_spec2).alias("OneLag"),
    first("TranAmt").over(win_spec2).alias("FirstValue"),
    last("TranAmt").over(win_spec2).alias("LastValue"),
    row_number().over(win_spec2).alias("RowNumber")
)

display(win5Df)


AccountId,TranDate,TranAmt,TwoLead,OneLag,FirstValue,LastValue,RowNumber
1,2011-01-01,500.0,250.0,0.0,500.0,500.0,1
1,2011-01-15,50.0,75.0,500.0,500.0,50.0,2
1,2011-01-22,250.0,125.0,50.0,500.0,250.0,3
1,2011-01-24,75.0,175.0,250.0,500.0,75.0,4
1,2011-01-26,125.0,0.0,75.0,500.0,125.0,5
1,2011-01-28,175.0,0.0,125.0,500.0,175.0,6
2,2011-01-01,500.0,25.0,0.0,500.0,500.0,1
2,2011-01-15,50.0,125.0,500.0,500.0,50.0,2
2,2011-01-22,25.0,200.0,50.0,500.0,25.0,3
2,2011-01-23,125.0,250.0,25.0,500.0,125.0,4
