In [0]:

%sql
Create database if not exists Sample

In [0]:

%sql

CREATE TABLE IF NOT EXISTS Sample.Transactions ( AccountId INT, TranDate DATE, TranAmt DECIMAL(8, 2));

CREATE TABLE IF NOT EXISTS Sample.Logical (RowID INT,FName VARCHAR(20), Salary SMALLINT);
     

In [0]:
%sql

INSERT INTO Sample.Transactions VALUES 
( 1, '2011-01-01', 500),
( 1, '2011-01-15', 50),
( 1, '2011-01-22', 250),
( 1, '2011-01-24', 75),
( 1, '2011-01-26', 125),
( 1, '2011-01-28', 175),
( 2, '2011-01-01', 500),
( 2, '2011-01-15', 50),
( 2, '2011-01-22', 25),
( 2, '2011-01-23', 125),
( 2, '2011-01-26', 200),
( 2, '2011-01-29', 250),
( 3, '2011-01-01', 500),
( 3, '2011-01-15', 50 ),
( 3, '2011-01-22', 5000),
( 3, '2011-01-25', 550),
( 3, '2011-01-27', 95 ),
( 3, '2011-01-30', 2500)

num_affected_rows,num_inserted_rows
18,18


In [0]:
%sql
INSERT INTO Sample.Logical
VALUES (1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000);

num_affected_rows,num_inserted_rows
14,14


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
df = spark.table("Sample.Transactions")
display(df)

AccountId,TranDate,TranAmt
1,2011-01-01,500.0
1,2011-01-15,50.0
1,2011-01-22,250.0
1,2011-01-24,75.0
1,2011-01-26,125.0
1,2011-01-28,175.0
2,2011-01-01,500.0
2,2011-01-15,50.0
2,2011-01-22,25.0
2,2011-01-23,125.0


In [0]:
win = Window.partitionBy("AccountId").orderBy("TranDate")

df_windowed = (
    df
    # pobiera wartość TranAmt z poprzedniego wiersza
    .withColumn("PrevTranAmt", lag("TranAmt", 1).over(win))
    
    # pobiera wartość TranAmt z następnego wiersza
    .withColumn("NextTranAmt", lead("TranAmt", 1).over(win))
    
    # pokazuje pierwszą kwotę transakcji dla danego konta
    .withColumn("FirstTranAmt", first("TranAmt").over(win))
    
    # pokazuje ostatnią kwotę transakcji do tego momentu dla danego konta
    .withColumn("LastTranAmt", last("TranAmt").over(win))
    
    # nadaje numer porządkowy każdej transakcji
    .withColumn("RowNum", row_number().over(win))
)
display(df_windowed)

AccountId,TranDate,TranAmt,PrevTranAmt,NextTranAmt,FirstTranAmt,LastTranAmt,RowNum
1,2011-01-01,500.0,,500.0,500.0,500.0,1
1,2011-01-01,500.0,500.0,500.0,500.0,500.0,2
1,2011-01-01,500.0,500.0,50.0,500.0,500.0,3
1,2011-01-15,50.0,500.0,50.0,500.0,50.0,4
1,2011-01-15,50.0,50.0,50.0,500.0,50.0,5
1,2011-01-15,50.0,50.0,250.0,500.0,50.0,6
1,2011-01-22,250.0,50.0,250.0,500.0,250.0,7
1,2011-01-22,250.0,250.0,250.0,500.0,250.0,8
1,2011-01-22,250.0,250.0,75.0,500.0,250.0,9
1,2011-01-24,75.0,250.0,75.0,500.0,75.0,10
