In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import count, col

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder.getOrCreate()

In [2]:
products = spark.createDataFrame([('Product A', 'Category 1', 'Category 3'),
                                  ('Product B', 'Category 2', 'Category 3'),
                                  ('Product C', 'Category 1', 'Category 4'),
                                  ('Product D', None, None),
                                  ('Product E', 'Category 2', None)],

                                 ['product', 'category_1', 'category_2'])

products.show()

+---------+----------+----------+
|  product|category_1|category_2|
+---------+----------+----------+
|Product A|Category 1|Category 3|
|Product B|Category 2|Category 3|
|Product C|Category 1|Category 4|
|Product D|      null|      null|
|Product E|Category 2|      null|
+---------+----------+----------+



In [3]:
result_df = products.select('product', 'category_1').union(products.select('product', 'category_2'))
result_df = result_df.withColumnRenamed("category_1", "category")
result_df.show()

+---------+----------+
|  product|  category|
+---------+----------+
|Product A|Category 1|
|Product B|Category 2|
|Product C|Category 1|
|Product D|      null|
|Product E|Category 2|
|Product A|Category 3|
|Product B|Category 3|
|Product C|Category 4|
|Product D|      null|
|Product E|      null|
+---------+----------+



In [4]:
result_df = result_df.dropDuplicates()
result_df.show()

+---------+----------+
|  product|  category|
+---------+----------+
|Product A|Category 1|
|Product B|Category 2|
|Product C|Category 1|
|Product D|      null|
|Product E|Category 2|
|Product A|Category 3|
|Product B|Category 3|
|Product C|Category 4|
|Product E|      null|
+---------+----------+



In [5]:
result_df = result_df.sort(["product", 'category'], ascending=False)
result_df.show()

+---------+----------+
|  product|  category|
+---------+----------+
|Product E|Category 2|
|Product E|      null|
|Product D|      null|
|Product C|Category 4|
|Product C|Category 1|
|Product B|Category 3|
|Product B|Category 2|
|Product A|Category 3|
|Product A|Category 1|
+---------+----------+



In [6]:
window_spec = Window.partitionBy("product")
result_df = result_df.withColumn("count", count(col("product")).over(window_spec))
result_df.show()

+---------+----------+-----+
|  product|  category|count|
+---------+----------+-----+
|Product A|Category 3|    2|
|Product A|Category 1|    2|
|Product B|Category 3|    2|
|Product B|Category 2|    2|
|Product C|Category 4|    2|
|Product C|Category 1|    2|
|Product D|      null|    1|
|Product E|Category 2|    2|
|Product E|      null|    2|
+---------+----------+-----+



In [7]:
result_df = result_df.filter("NOT (count > 1 and category IS NULL)")
result_df = result_df.drop('count')
result_df.show()

+---------+----------+
|  product|  category|
+---------+----------+
|Product A|Category 3|
|Product A|Category 1|
|Product B|Category 3|
|Product B|Category 2|
|Product C|Category 4|
|Product C|Category 1|
|Product D|      null|
|Product E|Category 2|
+---------+----------+

