In [1]:
from pyspark.sql import SparkSession

#### PySpark Window Functions
PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. PySpark SQL supports three kinds of window functions:

ranking functions
analytic functions
aggregate functions


To perform an operation on a group first, we need to partition the data using Window.partitionBy() , and for row number and rank function we need to additionally order by on partition data using orderBy clause.

In [2]:
spark = SparkSession.builder.appName("Day5Spark").master("local").getOrCreate()

In [3]:
df = spark.read.format('csv').option('header','true').load('interview_data.csv')

In [4]:
df.printSchema()

root
 |-- (Deal ID): string (nullable = true)
 |-- (Products): string (nullable = true)
 |-- (Industry): string (nullable = true)
 |-- (Category): string (nullable = true)
 |-- (Super Region): string (nullable = true)
 |-- (Start Date): string (nullable = true)
 |-- (Term in Days): string (nullable = true)
 |-- (Amount $): string (nullable = true)



In [5]:
import re
from pyspark.sql import functions as F

df = df.select([F.col(col).alias(col.replace('(', '')) for col in df.columns])
df = df.select([F.col(col).alias(col.replace(')', '')) for col in df.columns])
df = df.select([F.col(col).alias(col.replace(' ', '')) for col in df.columns])
df.printSchema()

root
 |-- DealID: string (nullable = true)
 |-- Products: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- SuperRegion: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- TerminDays: string (nullable = true)
 |-- Amount$: string (nullable = true)



##### row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("Products").orderBy("DealID")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+------------------+--------+-----------------------+--------+-----------+----------+----------+------------------+----------+
|DealID            |Products|Industry               |Category|SuperRegion|StartDate |TerminDays|Amount$           |row_number|
+------------------+--------+-----------------------+--------+-----------+----------+----------+------------------+----------+
|null              |null    |null                   |null    |null       |null      |null      |null              |1         |
|null              |null    |null                   |null    |null       |null      |null      |null              |2         |
|null              |null    |null                   |null    |null       |null      |null      |null              |3         |
|null              |null    |null                   |null    |null       |null      |null      |null              |4         |
|null              |null    |null                   |null    |null       |null      |null      |null           

#### rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [7]:
"""rank"""
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)).show()

+------------------+--------+--------------------+--------+-----------+----------+----------+------------------+----+
|            DealID|Products|            Industry|Category|SuperRegion| StartDate|TerminDays|           Amount$|rank|
+------------------+--------+--------------------+--------+-----------+----------+----------+------------------+----+
|              null|    null|                null|    null|       null|      null|      null|              null|   1|
|              null|    null|                null|    null|       null|      null|      null|              null|   1|
|              null|    null|                null|    null|       null|      null|      null|              null|   1|
|              null|    null|                null|    null|       null|      null|      null|              null|   1|
|              null|    null|                null|    null|       null|      null|      null|              null|   1|
|0063a00000gN5cQAAS| ANSIBLE|  Financial Services|      

#### dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

In [8]:
"""dens_rank"""
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

+------------------+--------+--------------------+--------+-----------+----------+----------+------------------+----------+
|            DealID|Products|            Industry|Category|SuperRegion| StartDate|TerminDays|           Amount$|dense_rank|
+------------------+--------+--------------------+--------+-----------+----------+----------+------------------+----------+
|              null|    null|                null|    null|       null|      null|      null|              null|         1|
|              null|    null|                null|    null|       null|      null|      null|              null|         1|
|              null|    null|                null|    null|       null|      null|      null|              null|         1|
|              null|    null|                null|    null|       null|      null|      null|              null|         1|
|              null|    null|                null|    null|       null|      null|      null|              null|         1|
|0063a00