In [0]:
from pyspark.sql.functions import col, explode, when, count, col, lit, current_date
from pyspark.sql.types import BooleanType, IntegerType

In [0]:
df= spark.sql(
  """
  SELECT * FROM processed.ct.customer
  """
)

In [0]:
df_with_benefits = df.withColumn("benefit", explode("benefits"))
df_joined = df.alias("d").join(df_with_benefits.alias("b"), df.id == df_with_benefits.id, "outer")



In [0]:
df_grouped = df_joined\
    .groupby(col("d.id").alias("customer_bk"), col("d.user_name"), col("d.name"), col("d.address"), col("d.birth_date"), col("d.email"), col("d.active_customer").cast(BooleanType()))\
    .agg(
        when(count(when(col("d.tier") == "Bronze", True)) > 0, True).otherwise(False).alias("bronze_tier"),
        when(count(when(col("d.tier") == "Silver", True)) > 0, True).otherwise(False).alias("silver_tier"),
        when(count(when(col("d.tier") == "Gold", True)) > 0, True).otherwise(False).alias("gold_tier"),
        when(count(when(col("d.tier") == "Platinum", True)) > 0, True).otherwise(False).alias("platinum_tier"),
        when(count(when(col("benefit")=="24 hour dedicated line", True))>0, True).otherwise(False).alias("24_hour_dedicated_line_benefit"),
        when(count(when(col("benefit") == "concert tickets", True)) > 0, True).otherwise(False).alias("concert_ticket_benefit"),
        when(count(when(col("benefit") == "travel insurance", True)) > 0, True).otherwise(False).alias("travel_insurance_benefit"),
        when(count(when(col("benefit") == "financial planning assistance", True)) > 0, True).otherwise(False).alias("financial_planning_assistance_benefit"),
        when(count(when(col("benefit") == "shopping discounts", True)) > 0, True).otherwise(False).alias("shopping_discounts_benefit"),
        when(count(when(col("benefit") == "sports tickets", True)) > 0, True).otherwise(False).alias("sports_tickets_benefit"),
        when(count(when(col("benefit") == "concierge services", True)) > 0, True).otherwise(False).alias("concierge_service_benefit"),
        when(count(when(col("benefit") == "car rental insurance", True)) > 0, True).otherwise(False).alias("car_rental_benefit"),
        when(count(when(col("benefit") == "airline lounge access", True)) > 0, True).otherwise(False).alias("airline_lounge_access_benefit"),
        when(count(when(col("benefit") == "dedicated account representative", True)) > 0, True).otherwise(False).alias("dedicated_account_representative")
        
        )   


#### Managing Slowly Changing Dimension Type 2!

In [0]:
 
df_updated = df_grouped\
    .withColumn("start_date", lit(current_date()))\
    .withColumn("end_date", lit(None))\
    .withColumn("is_current", lit(True))\
    .withColumn("source_system", lit("mongoDB_1"))
df_current = spark.sql(
    """
    SELECT * EXCEPT (customer_sk)
    FROM presentation.ct.dim_customer
    WHERE is_current = 1
    """
)

if df_current.count() != 0:
    df_upd_j_cur = df_updated.alias("upd").join(df_current.alias("cur"), df_updated.customer_bk == df_current.customer_bk, "outer")

    df_hist_records = df_upd_j_cur\
        .filter(col("cur.customer_bk").isNotNull())\
        .filter(col("upd.address") != col("cur.address"))\
        .select("cur.*")\
        .withColumn("is_current", lit(False))\
        .withColumn("end_date", lit(current_date()))
        

    id_list = [row.customer_bk for row in df_hist_records.select("cur.customer_bk").collect()]

    df_new_records = df_upd_j_cur\
        .withColumn("upd.start_date", when((~col("upd.customer_bk").isin(id_list)) & (col("cur.start_date") != 'null') , col("cur.start_date")))\
        .select("upd.*")

    df_final = df_new_records.union(df_hist_records)

    df_final.createOrReplaceTempView("source_data")
else:
    df_final = df_updated
    df_final.createOrReplaceTempView("source_data")



In [0]:
%sql
MERGE INTO presentation.ct.dim_customer AS trg
USING source_data AS src
ON src.customer_bk = trg.customer_bk AND src.is_current = 1
WHEN MATCHED THEN UPDATE
SET
  customer_bk = src.customer_bk,
  name = src.name,
  user_name = src.user_name,
  address = src.address,
  birth_date = src.birth_date,
  email = src.email,
  active_customer = src.active_customer,
  bronze_tier = src.bronze_tier,
  silver_tier = src.silver_tier,
  gold_tier = src.gold_tier,
  platinum_tier = src.platinum_tier,
  24_hour_dedicated_line_benefit = src.24_hour_dedicated_line_benefit,
  concert_ticket_benefit = src.concert_ticket_benefit,
  travel_insurance_benefit = src.travel_insurance_benefit,
  financial_planning_assistance_benefit = src.financial_planning_assistance_benefit,
  shopping_discounts_benefit = src.shopping_discounts_benefit,
  sports_tickets_benefit = src.sports_tickets_benefit,
  concierge_service_benefit = src.concierge_service_benefit,
  car_rental_benefit = src.car_rental_benefit,
  airline_lounge_access_benefit = src.airline_lounge_access_benefit,
  dedicated_account_representative = src.dedicated_account_representative,
  start_date = src.start_date,
  end_date = src.end_date,
  is_current = src.is_current,
  source_system = src.source_system
WHEN NOT MATCHED THEN 
INSERT(
  customer_bk,
  name,
  user_name,
  address,
  birth_date,
  email,
  active_customer,
  bronze_tier,
  silver_tier,
  gold_tier,
  platinum_tier,
  24_hour_dedicated_line_benefit,
  concert_ticket_benefit,
  travel_insurance_benefit,
  financial_planning_assistance_benefit,
  shopping_discounts_benefit,
  sports_tickets_benefit,
  concierge_service_benefit,
  car_rental_benefit,
  airline_lounge_access_benefit,
  dedicated_account_representative,
  start_date,
  end_date,
  is_current,
  source_system
)
VALUES(
  src.customer_bk,
  src.name,
  src.user_name,
  src.address,
  src.birth_date,
  src.email,
  src.active_customer,
  src.bronze_tier,
  src.silver_tier,
  src.gold_tier,
  src.platinum_tier,
  src.24_hour_dedicated_line_benefit,
  src.concert_ticket_benefit,
  src.travel_insurance_benefit,
  src.financial_planning_assistance_benefit,
  src.shopping_discounts_benefit,
  src.sports_tickets_benefit,
  src.concierge_service_benefit,
  src.car_rental_benefit,
  src.airline_lounge_access_benefit,
  src.dedicated_account_representative,
  src.start_date,
  src.end_date,
  src.is_current,
  src.source_system
)


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
501,500,0,1
