In [0]:
from pyspark.sql import Row

data = [
    Row(date="2022-01", item="apple", sales=100),
    Row(date="2022-01", item="banana", sales=200),
    Row(date="2022-01", item="grapes", sales=200),
    Row(date="2022-01", item="orange", sales=300),
    Row(date="2022-02", item="apple", sales=150),
    Row(date="2022-02", item="banana", sales=250),
    Row(date="2022-02", item="orange", sales=350),
    Row(date="2022-02", item="grapes", sales=420),
    Row(date="2022-03", item="apple", sales=200),
    Row(date="2022-03", item="banana", sales=300),
    Row(date="2022-03", item="grapes", sales=110),
    Row(date="2022-03", item="orange", sales=400),
]

# Create a DataFrame
df = spark.createDataFrame(data)

In [0]:
display(df)

date,item,sales
2022-01,apple,100
2022-01,banana,200
2022-01,grapes,200
2022-01,orange,300
2022-02,apple,150
2022-02,banana,250
2022-02,orange,350
2022-02,grapes,420
2022-03,apple,200
2022-03,banana,300


In [0]:
from pyspark.sql.window import Window

In [0]:
from pyspark.sql.functions import row_number

windowSpec = Window.orderBy("date")
display(df.withColumn("row_number", row_number().over(windowSpec)))

date,item,sales,row_number
2022-01,apple,100,1
2022-01,banana,200,2
2022-01,grapes,200,3
2022-01,orange,300,4
2022-02,apple,150,5
2022-02,banana,250,6
2022-02,orange,350,7
2022-02,grapes,420,8
2022-03,apple,200,9
2022-03,banana,300,10


In [0]:
from pyspark.sql.functions import rank

windowSpec = Window.orderBy("sales").partitionBy("date")
display(df.withColumn("rank", rank().over(windowSpec)))

date,item,sales,rank
2022-01,apple,100,1
2022-01,banana,200,2
2022-01,grapes,200,2
2022-01,orange,300,4
2022-02,apple,150,1
2022-02,banana,250,2
2022-02,orange,350,3
2022-02,grapes,420,4
2022-03,grapes,110,1
2022-03,apple,200,2


In [0]:
from pyspark.sql.functions import dense_rank

windowSpec = Window.orderBy("sales").partitionBy("date")
display(df.withColumn("rank", dense_rank().over(windowSpec)))

date,item,sales,rank
2022-01,apple,100,1
2022-01,banana,200,2
2022-01,grapes,200,2
2022-01,orange,300,3
2022-02,apple,150,1
2022-02,banana,250,2
2022-02,orange,350,3
2022-02,grapes,420,4
2022-03,grapes,110,1
2022-03,apple,200,2


In [0]:
from pyspark.sql.functions import percent_rank

windowSpec = Window.orderBy("sales").partitionBy("date")
display(df.withColumn("rank", percent_rank().over(windowSpec)))

date,item,sales,rank
2022-01,apple,100,0.0
2022-01,banana,200,0.3333333333333333
2022-01,grapes,200,0.3333333333333333
2022-01,orange,300,1.0
2022-02,apple,150,0.0
2022-02,banana,250,0.3333333333333333
2022-02,orange,350,0.6666666666666666
2022-02,grapes,420,1.0
2022-03,grapes,110,0.0
2022-03,apple,200,0.3333333333333333


In [0]:
from pyspark.sql.functions import lag, lead, col

windowSpec = Window.partitionBy("item").orderBy("date")

In [0]:
display(df.withColumn("prev_sales", lag(col("sales"), 1).over(windowSpec)))

date,item,sales,prev_sales
2022-01,apple,100,
2022-02,apple,150,100.0
2022-03,apple,200,150.0
2022-01,banana,200,
2022-02,banana,250,200.0
2022-03,banana,300,250.0
2022-01,grapes,200,
2022-02,grapes,420,200.0
2022-03,grapes,110,420.0
2022-01,orange,300,


In [0]:
display(df.withColumn("next_sales", lead(col("sales"), 1).over(windowSpec)))

date,item,sales,next_sales
2022-01,apple,100,150.0
2022-02,apple,150,200.0
2022-03,apple,200,
2022-01,banana,200,250.0
2022-02,banana,250,300.0
2022-03,banana,300,
2022-01,grapes,200,420.0
2022-02,grapes,420,110.0
2022-03,grapes,110,
2022-01,orange,300,350.0


In [0]:
display(
    df.withColumn(
        "sales_pct_change",
        (col("sales") - lag(col("sales"), 1).over(windowSpec))
        / lag(col("sales"), 1).over(windowSpec),
    )
)

date,item,sales,sales_pct_change
2022-01,apple,100,
2022-02,apple,150,0.5
2022-03,apple,200,0.3333333333333333
2022-01,banana,200,
2022-02,banana,250,0.25
2022-03,banana,300,0.2
2022-01,grapes,200,
2022-02,grapes,420,1.1
2022-03,grapes,110,-0.7380952380952381
2022-01,orange,300,


In [0]:
data = [("Alice", 25, "NYC"),
        ("Bob", 30, "LA"),
        ("Charlie", 35, "Chicago"),
        ("Dave", 40, "Boston"),
        ("Eve", 45, "Seattle"),
        ("Poly", 24, "Seattle"),
         ("Hepty", 29, "Chicago")]

df = spark.createDataFrame(data, ["name", "age", "city"])

In [0]:
display(df)

name,age,city
Alice,25,NYC
Bob,30,LA
Charlie,35,Chicago
Dave,40,Boston
Eve,45,Seattle
Poly,24,Seattle
Hepty,29,Chicago


In [0]:
from pyspark.sql.functions import desc

windowSpec = Window.orderBy(desc("age")).partitionBy("city")

In [0]:
from pyspark.sql.functions import first, last

display(df.select("*", first("name").over(windowSpec).alias("first_func")))

display(df.select("*", last("name").over(windowSpec).alias("first_func")))

name,age,city,first_func
Dave,40,Boston,Dave
Charlie,35,Chicago,Charlie
Hepty,29,Chicago,Charlie
Bob,30,LA,Bob
Alice,25,NYC,Alice
Eve,45,Seattle,Eve
Poly,24,Seattle,Eve


name,age,city,first_func
Dave,40,Boston,Dave
Charlie,35,Chicago,Charlie
Hepty,29,Chicago,Hepty
Bob,30,LA,Bob
Alice,25,NYC,Alice
Eve,45,Seattle,Eve
Poly,24,Seattle,Poly
