In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, percent_rank, when
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder \
    .appName("Read CSV with Header and Delimiter") \
    .getOrCreate()

In [4]:
pricing_path='./pricing_project_dataset.csv'


In [6]:
df = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .csv(pricing_path)

df.show()


+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+
|grass_region|category_group|   seller_type|shopee_item_id|shopee_model_id|competitor_item_id|competitor_model_id|shopee_model_price_usd|shopee_model_price|competitor_model_price|shopee_gmv_usd|shopee_order|shopee_model_competitiveness_status|
+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+
|          ID|   Electronics|Long Tail (LT)|    5646734211|     3953474697|        9153508336|          992466002|           50.33644918|            779233|                647903|   8758.537644|         174|                       Shopee > CPT|
|          SG|          

In [18]:
total_rows = df.count()
top30_row = int(total_rows * 0.3)

In [19]:
df1 = df.select("grass_region", "shopee_model_id", "shopee_order"). \
    orderBy(df["shopee_order"].desc()). \
    limit(top30_row)

df1.show()

+------------+---------------+------------+
|grass_region|shopee_model_id|shopee_order|
+------------+---------------+------------+
|          VN|     4317981410|          99|
|          ID|     6389067449|          99|
|          TH|     6574977902|          99|
|          ID|     9429269599|          99|
|          ID|     3220375140|          99|
|          SG|     5251047774|          99|
|          SG|     4259076905|          99|
|          ID|      193561413|          99|
|          ID|     9489720834|          99|
|          MY|     9978953060|          99|
|          TH|     4374479328|          99|
|          ID|     7727102321|          99|
|          SG|     5873574822|          99|
|          ID|     1091036890|          99|
|          PH|     9726452016|          99|
|          TH|     2629184467|          99|
|          ID|     1681435074|          99|
|          TH|     1905139218|          99|
|          ID|     1843321545|          99|
|          TH|     2414484961|  

In [29]:
result = df1.select("grass_region"). \
        groupBy("grass_region"). \
        count()

result.show()

+------------+-----+
|grass_region|count|
+------------+-----+
|          ID| 2507|
|          TH| 1283|
|          SG|  600|
|          VN|  637|
|          MY|  636|
|          PH|  636|
+------------+-----+



In [7]:
df.createOrReplaceTempView("local_price_competitive_by_sku")

In [14]:
window = Window.orderBy(col("shopee_order").desc())
df_with_rank = df.withColumn("platform_order_percent_rank", percent_rank().over(window))

df_with_rank.show()


+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+---------------------------+
|grass_region|category_group|   seller_type|shopee_item_id|shopee_model_id|competitor_item_id|competitor_model_id|shopee_model_price_usd|shopee_model_price|competitor_model_price|shopee_gmv_usd|shopee_order|shopee_model_competitiveness_status|platform_order_percent_rank|
+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+---------------------------+
|          ID|   Electronics|      Non-Mall|    7007766139|     3220375140|        7624501546|         3846499274|           9.058181943|            140225|                148246|   89

In [18]:
df_flagged = df_with_rank.withColumn("top_30_percent_flag", 
                                     when(col("platform_order_percent_rank") <= 0.3, 1).otherwise(0)).select("*")
df_flagged.show()

+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+---------------------------+-------------------+
|grass_region|category_group|   seller_type|shopee_item_id|shopee_model_id|competitor_item_id|competitor_model_id|shopee_model_price_usd|shopee_model_price|competitor_model_price|shopee_gmv_usd|shopee_order|shopee_model_competitiveness_status|platform_order_percent_rank|top_30_percent_flag|
+------------+--------------+--------------+--------------+---------------+------------------+-------------------+----------------------+------------------+----------------------+--------------+------------+-----------------------------------+---------------------------+-------------------+
|          ID|   Electronics|      Non-Mall|    7007766139|     3220375140|        7624501546|         3846499274|          