# PySpark Window Function
PySpark window functions are a set of SQL-like operations that allow you to perform calculations across a group of rows that are related to the current row, but without collapsing the rows into a single row. These functions are particularly useful for tasks such as ranking, aggregating over specific partitions, and calculating cumulative or rolling statistics.

**Key Features of PySpark Window Functions**

1. Operate on a "Window" of Rows: Define a subset of data (the "window") for each row based on certain criteria like partitioning and ordering.
1. Non-collapsing: Unlike groupBy, window functions keep the number of rows unchanged.
1. SQL and Functional API: Can be used with both SQL queries and PySpark's DataFrame API.


**Common Use Cases**<br>
- **Ranking rows**: Assign ranks to rows within a partition.
- **Cumulative calculations**: Compute running totals, averages, etc.
- **Lag/Lead**: Access previous or next row values.
- **Aggregations**: Perform operations like min, max, avg over a specified window.

**Syntax**<br>
Using a window function involves three steps:

1. Define the Window:
    - Partition: Specifies the grouping of rows.
    - Order: Specifies the sorting within each partition.
1. Apply the Function: Perform an operation (e.g., sum, rank).
1. Use the Result: Add the calculated column to the DataFrame.

**Examples**

In [0]:
from pyspark.sql import SparkSession

# Initialize the SparkSession with a specific application name
spark = (SparkSession.builder
         .appName('PySpark Window Function')
         .getOrCreate())

spark

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

**Ranking**: Rank products within each category based on sales

In [0]:
data = [
    ("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
]

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)

df.printSchema()
df.show()

### Ranking functions

PySpark’s Window Ranking functions, such as row_number(), rank(), and dense_rank(), are used to assign unique identifiers or ranks to rows within a specific partition of a dataset. These functions operate over a window, which is a subset of data defined by a partitioning and ordering logic. They are useful for tasks like ordering, ranking, and identifying specific rows based on the specified conditions.

**Key Concepts**
- **Partition**: Divides the dataset into groups based on one or more columns (e.g., department).
- **Ordering**: Determines the sequence of rows within each partition (e.g., by salary).
- **Sequential Assignment**: These functions assign numbers to rows in the order defined by the partition and sorting criteria.

**Key Benefits**
- **Enhanced Data Insights**: Easily analyze and compare rows within groups.
- **Versatility**: Useful in real-world scenarios like leaderboard rankings, pagination, and top-N analysis.
- **Control Over Ties**: Choose between rank and dense_rank depending on how you want to handle ties.

In [0]:
window_spec_ex  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(window_spec_ex)) \
    .withColumn("rank",rank().over(window_spec_ex)) \
    .withColumn("dense_rank",dense_rank().over(window_spec_ex)) \
    .show()

### Top Selling Product

In [0]:
%run ../DatasetSourcePath

In [0]:
path = sourcePath + '/dataset/sales_data_cleaned.csv'
salesdf = spark.read.csv(
               path, 
               header=True, 
               inferSchema=True).drop('Month')

print(salesdf.count())
salesdf.printSchema()
salesdf.show(5)

In [0]:
tot_salesdf = (salesdf
               .filter("Category != 'Unknown'")
               .groupBy('Category', 'Product')
               .agg(
                   round(sum(col("Sales") * col("Quantity"))).alias("TotalSales")
               ))
tot_salesdf.show()

In [0]:
window_spec = Window.partitionBy("Category").orderBy(col("TotalSales").desc())

# Apply the ranking functions
ranked_df = (tot_salesdf
             .withColumn("row_number", row_number().over(window_spec))
             .filter(col("row_number") == 1)
             .select('Category', 'Product', 'TotalSales')
             .sort(col('TotalSales').desc()))

ranked_df.show(5)

### ntile window function

The ```ntile() ```window function in PySpark is used to distribute rows of data into a specified number of buckets or groups, based on the ordering of the rows within a partition. The rows are evenly divided into the given number of buckets, and each row is assigned a bucket number from 1 to n, where n is the number of buckets.

**How ntile() works:**
- The function takes a single argument, which is the number of buckets (n) to divide the data into.
- It returns the bucket number for each row in the ordered set.
- The rows are first ordered according to a specified column and then divided into n groups as evenly as possible.
    - If the rows cannot be evenly divided, some buckets may contain one more row than others.

**Use Cases for ntile():**
- **Percentile Calculations**: Dividing data into quantiles like quartiles, deciles, etc., for analysis such as statistical summaries.
- **Categorization**: Assigning categories to data points based on ranks, such as splitting data into high, medium, and low categories.
- **Segmenting Data**: Segmenting users, customers, or employees based on certain metrics (e.g., income, sales performance, etc.) into equal-sized buckets.

**Example**: Each employee is assigned a Salary_Quartile based on their salary relative to others.

In [0]:
# Define the Window specification (ordering by Salary in descending order)
window_spec_ex = Window.orderBy(col("Salary").desc())
df.withColumn("Salary_Quartile",ntile(4).over(window_spec_ex)).show()

In [0]:
window_spec = Window.partitionBy("Category").orderBy(col("TotalSales").desc())
tot_salesdf.withColumn("ntile",ntile(2).over(window_spec)).show()

### Cumulative Distribution Window Function

The ```cume_dist()``` window function in PySpark calculates the cumulative distribution of a value within a partition. It provides a measure of how the current row compares to all the other rows in the partition based on a specific ordering. It calculates the relative rank of each row in terms of its value in the specified partition, normalized between 0 and 1.

i.e. cume_dist() gives the fraction of rows in the partition that have a value less than or equal to the current row's value.

In [0]:
# Apply cume_dist function to calculate the cumulative distribution of salaries
window_spec_ex  = Window.partitionBy("department").orderBy("salary")
df.withColumn("CumeDist", cume_dist().over(window_spec_ex)).show()

In [0]:
window_spec = Window.partitionBy("Category").orderBy(col("TotalSales"))
tot_salesdf.withColumn("CumeDist",cume_dist().over(window_spec)).show()

## Lag Window Function

The ```lag()``` window function in PySpark is used to access data from a previous row in the same result set, without needing to perform a self-join. This function provides a way to compare the current row's value to previous row values based on a specified order within a partition.

**Syntax:** ```df.withColumn("lag_column", F.lag(column_name, offset, default_value).over(window_spec))```

**Where:**
- **column_name**: The column from which you want to retrieve the lagged value.
- **offset**: (Optional) The number of rows before the current row to access. By default, it's 1.
- **default_value**: (Optional) The value to return if there is no previous row. By default, it's None.
- **window_spec**: Defines how to partition and order the data.

**How it Works:**
- The lag() function returns the value of the column from a previous row based on the specified offset (number of rows before the current row).
- It requires a window specification that defines how the data should be partitioned and ordered.
- If there is no previous row (e.g., for the first row), the lag() function returns the default value or null if no default value is provided.

In [0]:
window_spec_ex  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary").over(window_spec_ex)) \
    .withColumn("lead",lead("salary").over(window_spec_ex)).show()

In [0]:
# Filter for the year 2023, Group by Month and calculate total sales
df_monthly_sales = (salesdf
                    .filter(year(col("Date")) == 2023)
                    .groupBy(month(col("Date")).alias("Month"))
                    .agg(
                        round(sum("Sales"),2).alias("TotalSales")
                    ).sort('Month'))

df_monthly_sales.show(10)

In [0]:
# Define the Window specification (order by Month)
window_spec = Window.orderBy("Month")

# Apply lag() and lead() to get the previous and next month's sales
df_with_lag_lead = df_monthly_sales.withColumn(
    "PreviousMonthSales", 
    lag("TotalSales", 1).over(window_spec)
).withColumn(
    "NextMonthSales", 
    lead("TotalSales", 1).over(window_spec)
)

df_with_lag_lead.show()

### Find top order place by each customer

In [0]:
customers = [
    (1, "Robert", "NY"),
    (2, "Denial", "CL"),
    (3, "Demitri", "LA"),
    (4, "Rabita", "LA")
]

cdf = spark.createDataFrame(customers, schema=("cust_id", "name", "city"))
cdf.show()
orders = [
    (1, "Tablet", 670),
    (1, "Head Phone", 250),
    (2, "Mouse", 199),
    (2, "Mobile", 485),
    (4, "Tablet X", 942),
    (3, "Head Phone X+", 399),
    (1, "Mobile", 289),
    (3, "SSD", 459),
    (4, "Monitor", 549)
]
odf = spark.createDataFrame(orders, schema=('cust_id', 'product', 'sales'))
odf.show()

In [0]:
joined_df = cdf.join(odf, on="cust_id")
joined_df.show()

In [0]:
window_spec = Window.partitionBy("cust_id").orderBy(desc("sales"))
ranked_df = (joined_df
             .withColumn("rank", rank().over(window_spec))
             .filter(col("rank") == 1)
             .select("cust_id", "name", "city", "product", "sales")
             .sort(desc("sales")))

ranked_df.show(5)

In [0]:
# spark.stop()