#Tier at purchase/pyspark

In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, max, when, lag, to_date, coalesce, row_number, lit, year, month
from pyspark.sql.window import Window
from ipywidgets import widgets
import matplotlib.pyplot as plt
##from IPython.display import display
##import networkx as nx ## need to install networkx to cluster to enable visualizations


In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

members_loyalty_engagements = spark.table("cadp_consumer_sciences_lh.members_loyalty_engagements")
members_loyalty_tiers = spark.table("cadp_consumer_sciences_lh.members_loyalty_tiers")
consumer_profile = spark.table("cadp_consumer_sciences_lh.consumer_profile")
members_loyalty_expired_asset = spark.table("cadp_consumer_sciences_lh.members_loyalty_expired_asset")


In [0]:
# First, filter the consumer_profile DataFrame for non-null loyalty_memberid
consumer_profile_filtered = consumer_profile.filter(consumer_profile.loyalty_memberid.isNotNull())

# Join operations, ensuring we use aliases and specify columns unambiguously
tier_info = members_loyalty_engagements.alias('l') \
    .join(members_loyalty_tiers.alias('t'), 'acid', 'left_outer') \
    .join(consumer_profile_filtered.alias('cp'), 'acid', 'inner') \
    .filter((F.col('l.order_number').isNotNull()) & (F.col('l.engagement_date') > '2018-01-01')) \
    .groupBy('l.acid', 'l.order_number') \
    .agg(
        F.max('l.tier_details_id').alias('tier_after_purchase'),
        F.max(F.when(F.col('t.tier_achieved_date') <= F.col('l.engagement_date'), F.col('t.tier_details_id'))).alias('tier_before_purchase_raw'),
        F.max(F.when(F.col('t.tier_achieved_date') <= F.col('l.engagement_date'), F.col('t.tier_achieved_date'))).alias('tier_date'),
        F.max('l.engagement_date').alias('order_date'),
        F.first('t.country_code').alias('country_code')  # Adding country_code
    )

# Window specification for calculating previous order date and tier
windowSpec = Window.partitionBy('acid').orderBy('order_date')

# Calculating previous order date and tier
previous_order_date = tier_info.withColumn('prev_order_date', F.lag('order_date').over(windowSpec)) \
    .withColumn('prev_tier_after_purchase', F.lag('tier_after_purchase').over(windowSpec))

# Joining with members_loyalty_expired_asset for expired_assets
expired_assets = previous_order_date.alias('p') \
    .join(members_loyalty_expired_asset.alias('ea'), (F.col('p.acid') == F.col('ea.acid')) & (F.col('ea.expiry_date') > F.col('p.prev_order_date')) & (F.col('ea.expiry_date') <= F.col('p.order_date')), 'left_outer') \
    .select('p.acid', 'p.country_code', 'p.order_number', 'p.order_date', 'p.prev_order_date', 'p.tier_after_purchase', 'p.tier_before_purchase_raw', 'p.tier_date', 'p.prev_tier_after_purchase', F.col('ea.asset_type').alias('neg_tier_movement'))

# Assigning row numbers to orders
ranked_orders = expired_assets.withColumn('rn', F.row_number().over(windowSpec))

# Final selection and transformation
final_df = ranked_orders.select(
    'acid',
    'order_number',
    'country_code',  # Added here
    'tier_after_purchase',
    F.to_date('tier_date').alias('tier_date'),
    F.to_date('order_date').alias('order_date'),
    F.coalesce('tier_before_purchase_raw', F.lit(0)).alias('tier_before_purchase'),
    F.when(F.col('rn') == 1, 'first_purchase')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) < F.coalesce('tier_after_purchase', F.lit(0)), 'Upgrade')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) > F.coalesce('tier_after_purchase', F.lit(0)), 'Downgrade')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) == F.coalesce('tier_after_purchase', F.lit(0)), 'No Change')
    .otherwise('subsequent_purchase').alias('action_before_purchase'),
    F.max('neg_tier_movement').over(Window.partitionBy('acid', 'order_number')).alias('neg_tier_movement')
)

# Optional: Creating a temporary view for SQL-like querying
final_df.createOrReplaceTempView("tier_at_purchase")

#Examples

In [0]:
specific_user_df = spark.sql("SELECT * FROM tier_at_purchase WHERE acid = '004K2KV4OEU0OCOB'")
display(specific_user_df)

acid,order_number,country_code,tier_after_purchase,tier_date,order_date,tier_before_purchase,action_before_purchase,neg_tier_movement
004K2KV4OEU0OCOB,448900000001010000000000000004230513042022,GB,2,2021-11-14,2022-04-13,2,No Change,
004K2KV4OEU0OCOB,AUK30094161,GB,0,,2020-11-28,0,first_purchase,
004K2KV4OEU0OCOB,AUK31207140,GB,0,,2020-12-20,0,No Change,
004K2KV4OEU0OCOB,AUK35386658,GB,1,,2021-04-21,0,Upgrade,
004K2KV4OEU0OCOB,AUK38693067,GB,1,2021-04-23,2021-07-27,1,No Change,
004K2KV4OEU0OCOB,AUK38711593,GB,1,2021-04-23,2021-07-27,1,No Change,
004K2KV4OEU0OCOB,AUK39164693,GB,1,2021-04-23,2021-08-02,1,No Change,
004K2KV4OEU0OCOB,AUK43530463,GB,2,2021-04-23,2021-11-13,1,Upgrade,
004K2KV4OEU0OCOB,AUK44125502,GB,2,2021-11-14,2021-11-23,2,No Change,
004K2KV4OEU0OCOB,AUK47305665,GB,2,2021-11-14,2022-02-18,2,No Change,


In [0]:

# Filter for upgrades in January in the US and CA
upgrades_in_jan_us_ca = final_df.filter(
    (final_df.action_before_purchase == 'Upgrade') & 
    (final_df.country_code.isin('US', 'CA')) &
    (month(final_df.order_date) == 1) &
    (year(final_df.order_date) == 2024) 
)

# Count the number of users
count_upgrades = upgrades_in_jan_us_ca.count()

print(f"Number of users who upgraded in January in the US and CA: {count_upgrades}")


Number of users who upgraded in January in the US and CA: 81917


In [0]:
from pyspark.sql import functions as F

# Filter for actions (upgraded, downgraded) in January and group by country and action
actions_count_by_country_jan = final_df.filter(
    (F.col('action_before_purchase').isin(['Upgrade', 'Downgrade'])) &
    (F.month('order_date') == 1)
).groupBy('country_code', 'action_before_purchase').count()

# The resulting DataFrame 'actions_count_by_country_jan' will have columns: country_code, action_before_purchase, and count
# It lists the number of customers who either upgraded or downgraded, categorized by country for the month of January.



In [0]:
display(actions_count_by_country_jan)

country_code,action_before_purchase,count
VN,Upgrade,3965
NL,Upgrade,16883
IN,Downgrade,11220
JP,Downgrade,43236
GB,Upgrade,125782
IN,Upgrade,30769
CO,Downgrade,43210
TR,Upgrade,31440
DE,Upgrade,116985
PH,Upgrade,6175


In [0]:
us_customer_june = final_df.filter(
    (final_df.country_code.isin('US')) &
    (month(final_df.order_date) == 6) &
    (year(final_df.order_date) == 2023) 
)

In [0]:
display(us_customer_june)

acid,order_number,country_code,tier_after_purchase,tier_date,order_date,tier_before_purchase,action_before_purchase,neg_tier_movement
007S6K9UE7OVQWTS,AD182064073,US,1,2022-06-25,2023-06-18,1,No Change,
007S6K9UE7OVQWTS,AD182740729,US,1,2022-06-25,2023-06-24,1,No Change,
007S6K9UE7OVQWTS,AD182996645,US,1,2022-06-25,2023-06-27,1,No Change,
007S6K9UE7OVQWTS,AD183207967,US,1,2022-06-25,2023-06-29,1,No Change,
00AVXU2QTTTVYL6C,AD182749541,US,1,,2023-06-24,0,first_purchase,
00HFFPKCQXC1QYM3,AD904648941,US,1,,2023-06-01,0,first_purchase,
00HFFPKCQXC1QYM3,AD905052386,US,1,,2023-06-01,0,Upgrade,
00ZS4DAQ9QMCB9UF,AD905180915,US,1,,2023-06-01,0,first_purchase,
014QAPH8DYAMQV9Y,AD180937829,US,1,,2023-06-10,0,Upgrade,
01532RJ991XRJ0GU,AD905300673,US,2,2023-06-01,2023-06-02,2,No Change,


# Junkyard

**Define needed data sources**

In [0]:
members_loyalty_engagements = spark.table("cadp_consumer_sciences_lh.members_loyalty_engagements")
members_loyalty_tiers = spark.table("cadp_consumer_sciences_lh.members_loyalty_tiers")
consumer_profile = spark.table("cadp_consumer_sciences_lh.consumer_profile")
members_loyalty_expired_asset = spark.table("cadp_consumer_sciences_lh.members_loyalty_expired_asset")

**Implement 'tier_info' Logic**

In [0]:
tier_info = (members_loyalty_engagements
              .join(members_loyalty_tiers, members_loyalty_engagements["acid"] == members_loyalty_tiers["acid"], "left")
              .join(consumer_profile, members_loyalty_engagements["acid"] == consumer_profile["acid"], "left")
              .filter((col("order_number").isNotNull()) & 
                      (consumer_profile["loyalty_memberid"].isNotNull()) & 
                      (col("engagement_date") > "2018-01-01"))
              .groupBy(members_loyalty_engagements["acid"], "order_number", members_loyalty_tiers["country_code"])
              .agg(
                  max(members_loyalty_tiers["tier_details_id"]).alias("tier_after_purchase"),
                  max(when(col("tier_achieved_date") <= col("engagement_date"), members_loyalty_tiers["tier_details_id"])).alias("tier_before_purchase_raw"),
                  max(when(col("tier_achieved_date") <= col("engagement_date"), col("tier_achieved_date"))).alias("tier_date"),
                  max("engagement_date").alias("order_date")
              ))


**Calculate previous_order_date and expired_assets**

In [0]:
windowSpec = Window.partitionBy("acid").orderBy("order_date")

previous_order_date = (tier_info.withColumn("prev_order_date", lag("order_date").over(windowSpec))
                                 .withColumn("prev_tier_after_purchase", lag("tier_after_purchase").over(windowSpec)))

expired_assets = (previous_order_date
                  .join(members_loyalty_expired_asset, "acid", "left")
                  .filter((col("expiry_date") > col("prev_order_date")) & (col("expiry_date") <= col("order_date"))))


**Rank Orders and Select Final Columns**

In [0]:
ranked_orders = expired_assets.withColumn("rn", row_number().over(windowSpec))

final_df = (ranked_orders.select(
            col("acid"),
            col("order_number"),
            col("tier_after_purchase"),
            members_loyalty_tiers["country_code"].alias("country_code"),
            to_date(col("tier_date")).alias("tier_date"),
            to_date(col("order_date")).alias("order_date"),
            coalesce(col("tier_before_purchase_raw"), lit(0)).alias("tier_before_purchase")
            ))


'''final_df = (ranked_orders.select(
            col("acid"),
            "order_number",
            "tier_after_purchase",
            "country_code",
            to_date("tier_date").alias("tier_date"),
            to_date("order_date").alias("order_date"),
            coalesce("tier_before_purchase_raw", lit(0)).alias("tier_before_purchase")
            # Include additional logic for action_before_purchase and neg_tier_movement as needed
            ))'''


'final_df = (ranked_orders.select(\n            col("acid"),\n            "order_number",\n            "tier_after_purchase",\n            "country_code",\n            to_date("tier_date").alias("tier_date"),\n            to_date("order_date").alias("order_date"),\n            coalesce("tier_before_purchase_raw", lit(0)).alias("tier_before_purchase")\n            # Include additional logic for action_before_purchase and neg_tier_movement as needed\n            ))'

In [0]:
final_df.show()

+--------------------+--------------------+-------------------+------------+----------+----------+--------------------+
|                acid|        order_number|tier_after_purchase|country_code| tier_date|order_date|tier_before_purchase|
+--------------------+--------------------+-------------------+------------+----------+----------+--------------------+
|    0005N885XBEY3DFG|         ACL05010849|                  1|          CL|2022-06-23|2023-04-16|                   1|
|    0005N885XBEY3DFG|         ACL05010849|                  1|          CL|2022-06-23|2023-04-16|                   1|
|    0005N885XBEY3DFG|         ACL05010849|                  1|          CL|2022-06-23|2023-04-16|                   1|
|    001BFP2IR94IN20Q|         ADE50254392|                  1|          DE|2022-07-16|2022-09-29|                   1|
|    001BFP2IR94IN20Q|         ADE50254392|                  1|          DE|2022-07-16|2022-09-29|                   1|
|001FF5D5-0104-4A5...|         ANL085098

In [0]:
display(final_df)

acid,order_number,tier_after_purchase,country_code,tier_date,order_date,tier_before_purchase
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
001BFP2IR94IN20Q,ADE50254392,1,DE,2022-07-16,2022-09-29,1
001BFP2IR94IN20Q,ADE50254392,1,DE,2022-07-16,2022-09-29,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,310100000001010000000000000007902030102022,3,NL,2022-10-26,2022-10-30,2
001KKEH5RTN0FZDT,MX14102112450,3,MX,2023-08-01,2024-01-27,3


**Create or Replace Temporary View**

In [0]:
final_df.createOrReplaceTempView("tier_at_purchase")


In [0]:
specific_user_df = spark.sql("SELECT * FROM tier_at_purchase WHERE acid = '004K2KV4OEU0OCOB'")
specific_user_df.show()

+----------------+------------+-------------------+------------+----------+----------+--------------------+
|            acid|order_number|tier_after_purchase|country_code| tier_date|order_date|tier_before_purchase|
+----------------+------------+-------------------+------------+----------+----------+--------------------+
|004K2KV4OEU0OCOB| AUK64058757|                  2|          GB|2022-12-01|2023-05-10|                   2|
+----------------+------------+-------------------+------------+----------+----------+--------------------+



In [0]:
%sql
select * 
from tier_at_purchase
limit 10

acid,order_number,tier_after_purchase,country_code,tier_date,order_date,tier_before_purchase
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
0005N885XBEY3DFG,ACL05010849,1,CL,2022-06-23,2023-04-16,1
001BFP2IR94IN20Q,ADE50254392,1,DE,2022-07-16,2022-09-29,1
001BFP2IR94IN20Q,ADE50254392,1,DE,2022-07-16,2022-09-29,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,ANL08509861,3,NL,2022-04-25,2022-06-02,1
001FF5D5-0104-4A58-0B90-97F7B5B52902,310100000001010000000000000007902030102022,3,NL,2022-10-26,2022-10-30,2
001KKEH5RTN0FZDT,MX14102112450,3,MX,2023-08-01,2024-01-27,3


In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import max, col, lag, to_date, coalesce, row_number, when
from pyspark.sql.window import Window

# Initialize Spark Session (assuming it's already created)
spark = SparkSession.builder.appName("Loyalty Tier Analysis").getOrCreate()

# Assuming DataFrame representations of your SQL tables are already loaded or created, e.g.:
# members_loyalty_engagements, members_loyalty_tiers, consumer_profile, members_loyalty_expired_asset

members_loyalty_engagements = spark.table("cadp_consumer_sciences_lh.members_loyalty_engagements")
members_loyalty_tiers = spark.table("cadp_consumer_sciences_lh.members_loyalty_tiers")
consumer_profile = spark.table("cadp_consumer_sciences_lh.consumer_profile")
members_loyalty_expired_asset = spark.table("cadp_consumer_sciences_lh.members_loyalty_expired_asset")

# tier_info CTE

tier_info = (members_loyalty_engagements
              .join(members_loyalty_tiers, members_loyalty_engagements["acid"] == members_loyalty_tiers["acid"], "left")
              .join(consumer_profile, members_loyalty_engagements["acid"] == consumer_profile["acid"], "left")
              .filter((col("order_number").isNotNull()) & 
                      (consumer_profile["loyalty_memberid"].isNotNull()) & 
                      (col("engagement_date") > "2018-01-01"))
              .groupBy(members_loyalty_engagements["acid"], "order_number", members_loyalty_tiers["country_code"])
              .agg(
                  max(members_loyalty_tiers["tier_details_id"]).alias("tier_after_purchase"),
                  max(when(col("tier_achieved_date") <= col("engagement_date"), members_loyalty_tiers["tier_details_id"])).alias("tier_before_purchase_raw"),
                  max(when(col("tier_achieved_date") <= col("engagement_date"), col("tier_achieved_date"))).alias("tier_date"),
                  max("engagement_date").alias("order_date")
              ))


# previous_order_date CTE
windowSpec = Window.partitionBy("acid").orderBy("order_date")
previous_order_date = (tier_info.withColumn("prev_order_date", lag("order_date").over(windowSpec))
                                .withColumn("prev_tier_after_purchase", lag("tier_after_purchase").over(windowSpec)))



# expired_assets CTE
expired_assets = (previous_order_date
    .join(members_loyalty_expired_asset.alias("ea"), 
          (previous_order_date["acid"] == col("ea.acid")) & 
          (col("ea.expiry_date") > previous_order_date["prev_order_date"]) & 
          (col("ea.expiry_date") <= previous_order_date["order_date"]), "left")
    .select(previous_order_date["*"], col("ea.asset_type").alias("neg_tier_movement")))

# ranked_orders CTE
ranked_orders = (expired_assets.withColumn("rn", row_number().over(Window.partitionBy("acid").orderBy("order_date"))))

# Final DataFrame
final_df = (ranked_orders
    .select(
        "acid",
        "order_number",
        "tier_after_purchase",
        to_date("tier_date").alias("tier_date"),
        to_date("order_date").alias("order_date"),
        coalesce("tier_before_purchase_raw", lit(0)).alias("tier_before_purchase"),
        when(col("rn") == 1, "first_purchase")
        .when(coalesce("tier_before_purchase_raw", lit(0)) < coalesce("tier_after_purchase", lit(0)), "Upgrade")
        .when(coalesce("tier_before_purchase_raw", lit(0)) > coalesce("tier_after_purchase", lit(0)), "Downgrade")
        .when(coalesce("tier_before_purchase_raw", lit(0)) == coalesce("tier_after_purchase", lit(0)), "No Change")
        .otherwise("subsequent_purchase").alias("action_before_purchase"),
        max("neg_tier_movement").over(Window.partitionBy("acid", "order_number")).alias("neg_tier_movement")
    ))

# Now you can use final_df for further analysis or actions


In [0]:
final_df.createOrReplaceTempView("tier_at_purchase")

In [0]:
specific_user_df = spark.sql("SELECT * FROM tier_at_purchase WHERE acid = '004K2KV4OEU0OCOB'")
specific_user_df.show()

+----------------+--------------------+-------------------+----------+----------+--------------------+----------------------+-----------------+
|            acid|        order_number|tier_after_purchase| tier_date|order_date|tier_before_purchase|action_before_purchase|neg_tier_movement|
+----------------+--------------------+-------------------+----------+----------+--------------------+----------------------+-----------------+
|004K2KV4OEU0OCOB|44890000000101000...|                  2|2021-11-14|2022-04-13|                   2|             No Change|             NULL|
|004K2KV4OEU0OCOB|         AUK30094161|                  2|      NULL|2020-11-28|                   0|        first_purchase|             NULL|
|004K2KV4OEU0OCOB|         AUK31207140|                  2|      NULL|2020-12-20|                   0|               Upgrade|             NULL|
|004K2KV4OEU0OCOB|         AUK35386658|                  2|      NULL|2021-04-21|                   0|               Upgrade|           

In [0]:
user_df = specific_user_df.toPandas()

In [0]:
display(user_df)

acid,order_number,tier_after_purchase,tier_date,order_date,tier_before_purchase,action_before_purchase,neg_tier_movement
004K2KV4OEU0OCOB,448900000001010000000000000004230513042022,2,2021-11-14,2022-04-13,2,No Change,
004K2KV4OEU0OCOB,AUK30094161,2,,2020-11-28,0,first_purchase,
004K2KV4OEU0OCOB,AUK31207140,2,,2020-12-20,0,Upgrade,
004K2KV4OEU0OCOB,AUK35386658,2,,2021-04-21,0,Upgrade,
004K2KV4OEU0OCOB,AUK38693067,2,2021-04-23,2021-07-27,1,Upgrade,
004K2KV4OEU0OCOB,AUK38711593,2,2021-04-23,2021-07-27,1,Upgrade,
004K2KV4OEU0OCOB,AUK39164693,2,2021-04-23,2021-08-02,1,Upgrade,
004K2KV4OEU0OCOB,AUK43530463,2,2021-04-23,2021-11-13,1,Upgrade,
004K2KV4OEU0OCOB,AUK44125502,2,2021-11-14,2021-11-23,2,No Change,
004K2KV4OEU0OCOB,AUK47305665,2,2021-11-14,2022-02-18,2,No Change,


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Assuming members_loyalty_engagements, members_loyalty_tiers, and consumer_profile are already loaded as DataFrames

# Correcting ambiguous reference by specifying the table alias explicitly in the join conditions
# First, filter the consumer_profile DataFrame for non-null loyalty_memberid
consumer_profile_filtered = consumer_profile.filter(consumer_profile.loyalty_memberid.isNotNull())

# Join operations, ensuring we use aliases and specify columns unambiguously
tier_info = members_loyalty_engagements.alias('l') \
    .join(members_loyalty_tiers.alias('t'), 'acid', 'left_outer') \
    .join(consumer_profile_filtered.alias('cp'), 'acid', 'inner') \
    .filter((F.col('l.order_number').isNotNull()) & (F.col('l.engagement_date') > '2018-01-01')) \
    .groupBy('l.acid', 'l.order_number') \
    .agg(
        F.max('l.tier_details_id').alias('tier_after_purchase'),
        F.max(F.when(F.col('t.tier_achieved_date') <= F.col('l.engagement_date'), F.col('t.tier_details_id'))).alias('tier_before_purchase_raw'),
        F.max(F.when(F.col('t.tier_achieved_date') <= F.col('l.engagement_date'), F.col('t.tier_achieved_date'))).alias('tier_date'),
        F.max('l.engagement_date').alias('order_date')
    )

# Window specification for calculating previous order date and tier
windowSpec = Window.partitionBy('acid').orderBy('order_date')

# Calculating previous order date and tier
previous_order_date = tier_info.withColumn('prev_order_date', F.lag('order_date').over(windowSpec)) \
    .withColumn('prev_tier_after_purchase', F.lag('tier_after_purchase').over(windowSpec))

# Joining with members_loyalty_expired_asset for expired_assets
expired_assets = previous_order_date.alias('p') \
    .join(members_loyalty_expired_asset.alias('ea'), (F.col('p.acid') == F.col('ea.acid')) & (F.col('ea.expiry_date') > F.col('p.prev_order_date')) & (F.col('ea.expiry_date') <= F.col('p.order_date')), 'left_outer') \
    .select('p.acid', 'p.order_number', 'p.order_date', 'p.prev_order_date', 'p.tier_after_purchase', 'p.tier_before_purchase_raw', 'p.tier_date', 'p.prev_tier_after_purchase', F.col('ea.asset_type').alias('neg_tier_movement'))

# Assigning row numbers to orders
ranked_orders = expired_assets.withColumn('rn', F.row_number().over(windowSpec))

# Final selection and transformation
final_df = ranked_orders.select(
    'acid',
    'order_number',
    'tier_after_purchase',
    F.to_date('tier_date').alias('tier_date'),
    F.to_date('order_date').alias('order_date'),
    F.coalesce('tier_before_purchase_raw', F.lit(0)).alias('tier_before_purchase'),
    F.when(F.col('rn') == 1, 'first_purchase')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) < F.coalesce('tier_after_purchase', F.lit(0)), 'Upgrade')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) > F.coalesce('tier_after_purchase', F.lit(0)), 'Downgrade')
    .when(F.coalesce('tier_before_purchase_raw', F.lit(0)) == F.coalesce('tier_after_purchase', F.lit(0)), 'No Change')
    .otherwise('subsequent_purchase').alias('action_before_purchase'),
    F.max('neg_tier_movement').over(Window.partitionBy('acid', 'order_number')).alias('neg_tier_movement')
)

# Optional: Creating a temporary view for SQL-like querying
final_df.createOrReplaceTempView("loyalty_engage_tier_v15")





In [0]:
specific_user_df = spark.sql("SELECT * FROM loyalty_engage_tier_v15 WHERE acid = '004K2KV4OEU0OCOB'")
specific_user_df.show()

+----------------+--------------------+-------------------+----------+----------+--------------------+----------------------+-----------------+
|            acid|        order_number|tier_after_purchase| tier_date|order_date|tier_before_purchase|action_before_purchase|neg_tier_movement|
+----------------+--------------------+-------------------+----------+----------+--------------------+----------------------+-----------------+
|004K2KV4OEU0OCOB|44890000000101000...|                  2|2021-11-14|2022-04-13|                   2|             No Change|             NULL|
|004K2KV4OEU0OCOB|         AUK30094161|                  0|      NULL|2020-11-28|                   0|        first_purchase|             NULL|
|004K2KV4OEU0OCOB|         AUK31207140|                  0|      NULL|2020-12-20|                   0|             No Change|             NULL|
|004K2KV4OEU0OCOB|         AUK35386658|                  1|      NULL|2021-04-21|                   0|               Upgrade|           

In [0]:
display(specific_user_df)

acid,order_number,tier_after_purchase,tier_date,order_date,tier_before_purchase,action_before_purchase,neg_tier_movement
004K2KV4OEU0OCOB,448900000001010000000000000004230513042022,2,2021-11-14,2022-04-13,2,No Change,
004K2KV4OEU0OCOB,AUK30094161,0,,2020-11-28,0,first_purchase,
004K2KV4OEU0OCOB,AUK31207140,0,,2020-12-20,0,No Change,
004K2KV4OEU0OCOB,AUK35386658,1,,2021-04-21,0,Upgrade,
004K2KV4OEU0OCOB,AUK38693067,1,2021-04-23,2021-07-27,1,No Change,
004K2KV4OEU0OCOB,AUK38711593,1,2021-04-23,2021-07-27,1,No Change,
004K2KV4OEU0OCOB,AUK39164693,1,2021-04-23,2021-08-02,1,No Change,
004K2KV4OEU0OCOB,AUK43530463,2,2021-04-23,2021-11-13,1,Upgrade,
004K2KV4OEU0OCOB,AUK44125502,2,2021-11-14,2021-11-23,2,No Change,
004K2KV4OEU0OCOB,AUK47305665,2,2021-11-14,2022-02-18,2,No Change,
