In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [2]:
products = spark.read.format('csv').option('header', True).load('./datasets/products.csv')

In [3]:
products.show()

+----------+--------+-----+
|   product|category|price|
+----------+--------+-----+
|Samsung TX|  Tablet|  999|
|Samsung JX|  Mobile|  799|
|Redmi Note|  Mobile|  399|
|        Mi|  Mobile|  299|
|      iPad|  Tablet|  789|
|    iPhone|  Mobile|  999|
|  Micromax|  Mobile|  249|
|    Lenovo|  Tablet|  499|
|   OnePlus|  Mobile|  356|
|        Xu|  Tablet|  267|
+----------+--------+-----+



In [4]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [14]:
window_spec_part_category = Window.partitionBy('category').orderBy(products.price.desc())

In [15]:
price_rank = func.rank().over(window_spec_part_category)

In [16]:
product_rank = products.withColumn('rank', func.rank().over(window_spec_part_category))
product_rank.show()

+----------+--------+-----+----+
|   product|category|price|rank|
+----------+--------+-----+----+
|    iPhone|  Mobile|  999|   1|
|Samsung JX|  Mobile|  799|   2|
|Redmi Note|  Mobile|  399|   3|
|   OnePlus|  Mobile|  356|   4|
|        Mi|  Mobile|  299|   5|
|  Micromax|  Mobile|  249|   6|
|Samsung TX|  Tablet|  999|   1|
|      iPad|  Tablet|  789|   2|
|    Lenovo|  Tablet|  499|   3|
|        Xu|  Tablet|  267|   4|
+----------+--------+-----+----+



In [25]:
products = products.withColumn('price', products.price.cast('int'))

In [29]:
window_spec_prev_and_current_row = Window.partitionBy('category')\
    .orderBy(products.price.desc()) \
    .rowsBetween(-1, 0)

In [30]:
price_max_between_prev_and_current = func.max('price').over(window_spec_prev_and_current_row)

In [31]:
products.withColumn('max_price', price_max_between_prev_and_current).show()

+----------+--------+-----+---------+
|   product|category|price|max_price|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      799|
|   OnePlus|  Mobile|  356|      399|
|        Mi|  Mobile|  299|      356|
|  Micromax|  Mobile|  249|      299|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      789|
|        Xu|  Tablet|  267|      499|
+----------+--------+-----+---------+



In [33]:
window_spec_all = Window.partitionBy('category')\
    .orderBy(products.price.desc()) \
    .rangeBetween(-sys.maxsize, sys.maxsize) # all rows between the partition

In [34]:
price_diff_against_highest_price_product = func.max(products.price).over(window_spec_all) - products.price

In [35]:
products.withColumn('diff', price_diff_against_highest_price_product).show()

+----------+--------+-----+----+
|   product|category|price|diff|
+----------+--------+-----+----+
|    iPhone|  Mobile|  999|   0|
|Samsung JX|  Mobile|  799| 200|
|Redmi Note|  Mobile|  399| 600|
|   OnePlus|  Mobile|  356| 643|
|        Mi|  Mobile|  299| 700|
|  Micromax|  Mobile|  249| 750|
|Samsung TX|  Tablet|  999|   0|
|      iPad|  Tablet|  789| 210|
|    Lenovo|  Tablet|  499| 500|
|        Xu|  Tablet|  267| 732|
+----------+--------+-----+----+

