In [None]:
import datetime
 
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, StringType
from pyspark.sql.window import Window


In [None]:
# 1) General functions

In [None]:

def build_master_internal(cmd, loyalty, sales_features, coolers, sfe, red, data_extraction_date="2020-05-30", id_cust="CUSTOMER_ID",):
  """This function combines pre-processed internal data into a single master file on customer level.
 
  Arguments:
      cmd {pyspark.sql.DataFrame} -- A spark dataframe that contains cmd data on Marrakech city
      loyalty {pyspark.sql.DataFrame} -- A spark dataframe that contains customer loyalty data
      sales_features {pyspark.sql.DataFrame} -- A spark dataframe that contains engineered sales features, on customer level
      coolers {pyspark.sql.DataFrame} -- A spark dataframe that contains cooler data aggregated to customer level
      sfe {pyspark.sql.DataFrame} -- A spark dataframe that contains sfe data aggregated to customer level (period total)
      red {pyspark.sql.DataFrame} -- A spark dataframe that contains red data aggregated to customer level (weekly mean)
      data_extraction_date {str} -- String to indicate a common date of extraction - of the type "2020-05-30"
      id_cust {str} -- Name of customer id variable to use
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe that contains the combined internal data.
  """
  
  master = \
    cmd. \
    join(loyalty, on=["CUSTOMER_COD", "CUSTOMER_ID"], how="left_outer"). \
    join(sales_features, on=[id_cust], how="left_outer"). \
    join(coolers_to_customer_count(coolers), on=[id_cust], how="left_outer"). \
    join(sfe_to_customer_total(sfe, data_extraction_date), on=[id_cust], how="left_outer"). \
    join(red_to_customer_total(red, data_extraction_date), on=[id_cust], how="left_outer")
  
  return master

In [None]:

def expand_master_2(master_internal, taa, seasonal, hybrids, id_var="CUSTOMER_ID"):
  """This function combines the internal master data with external sources - into a master file on customer level.
 
  Arguments:
      master_internal {pyspark.sql.DataFrame} -- A spark dataframe that contains the customer level internal master data 
      taa {pyspark.sql.DataFrame} -- A spark dataframe that contains customer level trade area analysis output 
      seasonality {pyspark.sql.DataFrame} -- A spark dataframe that contains seasonality features
      hybrids {pyspark.sql.DataFrame} -- A spark dataframe that contains AG hybrid features
      id_var {str} -- Name of customer id
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe combining internal and external data on customer level.
  """
 
  # add external features to the internal master set
  master_expanded = \
    master_internal. \
    join(taa.withColumnRenamed("LATITUDE", "LATITUDE_TAA").withColumnRenamed("LONGITUDE", "LONGITUDE_TAA"), on=[id_var, "CHANNEL_CUSTOM"], how="left"). \
    join(seasonal, on=id_var, how="left"). \
    join(hybrids, on=id_var, how="left")
  
  return master_expanded


In [None]:
def expand_master(master_internal, ta_matched_geo, ta_matched_no_geo, poi_matched):
  """This function combines the internal master data with external sources - into a master file on customer level.
 
  Arguments:
      master_internal {pyspark.sql.DataFrame} -- A spark dataframe that contains the customer level internal master data 
      ta_matched_geo {pyspark.sql.DataFrame} -- A spark dataframe that contains customer level trip advisor features with included geo match
      ta_matched_no_geo {pyspark.sql.DataFrame} -- A spark dataframe that contains customer level trip advisor features without geo match
      poi_matched {pyspark.sql.DataFrame} -- A spark dataframe that contains customer level point of interest features 
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe combining internal and external data on customer level.
  """
  
  # prep external sources
  
  # cust id-s are repeated in the 2 sets - currently dropping repeats in non-geo matches, consider whether to preserve both matchings instead
  droppers = ta_matched_no_geo.select(F.col("L_CUSTOMER_ID")).join(ta_matched_geo.select(F.col("L_CUSTOMER_ID")), how="inner", on=["L_CUSTOMER_ID"])
  droppers = droppers.select("L_CUSTOMER_ID").rdd.flatMap(lambda x: x).collect()
  
  ta_matched = stamp_feature_source(ta_matched_geo.union(ta_matched_no_geo.filter(~ta_matched_no_geo.L_CUSTOMER_ID.isin(droppers))), "_MATCHED_TA")
  poi_matched = stamp_feature_source(poi_matched, "_MATCHED_POI")
 
  # add external features to the internal master set
  master_expanded = \
    master_internal. \
    join(ta_matched, master_internal.CUSTOMER_ID == ta_matched.L_CUSTOMER_ID_MATCHED_TA, how="left"). \
    join(poi_matched, master_internal.CUSTOMER_ID == poi_matched.CUSTOMER_ID_MATCHED_POI, how="left")
  
  return master_expanded



In [None]:
def clean_from_junk_cols(data, thresh=0.8, regex="Age_Group_"):
  """This function cleans a dataset from unusable columns - with no meaningful interpretation or too many missing values.
 
  Arguments:
      data {pyspark.sql.DataFrame} -- A spark dataframe to be cleaned up 
      thresh {float} -- Threshold value for null/nan count @ and above which to drop features (defaults to 0.8)
      regex {float} -- Regular expression to use in excluding groups of variables by strings contained in their names 
 
  Returns:
      pyspark.sql.DataFrame -- A cleaned spark dataframe.
  """
  
  df_nrows = data.count()
  
  # 1) Define columns with no meaningful interpretation - to be dropped
  exclude = list(data.toPandas().filter(regex=regex).columns)
 
  # 2) Define columns with null/nan count above acceptable threshold - to be dropped
  missings = data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in data.columns])
  drop = missings.withColumn("id", F.lit("null_nan_count")).toPandas().set_index("id").T.reset_index().rename(columns={"index":"variable"})
  drop = drop.loc[(drop["null_nan_count"] >= df_nrows*thresh)]["variable"].tolist()
 
  # 3) Clean data from 1) [exclude] and 2) [drop] columns
  
  return data.drop(*(drop+exclude))

In [None]:
def stamp_feature_source(data, suffix):
  """This function takes a dataframe and adds a suffix to all column names.
 
  Arguments:
      data {pyspark.sql.DataFrame} -- A spark dataframe to operate on.
      suffix {str} -- suffix to add to column names.
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe with extended column names.
  """  
  
  return data.select([F.col(x).alias(x+suffix) for x in data.columns])


In [1]:
def derive_spark_week(spark_data, date_var="DATE_ISO"):
  """This function takes the raw sales data and adds a week variable,
  later to be used in aggregation manipulations.
 
  Arguments:
      spark_data {pyspark.sql.DataFrame} -- A spark dataframe to use
      date_var {string} -- Name of date column to use in week derivation
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe that contains the raw sales data and one additional
          variable called week_index representing week numbers from earliest observed sale.
  """
  
  # 1) first collect all unique data
  # 2) then put them in a python list
  dataset_dates = \
    spark_data. \
    groupBy(date_var). \
    agg({date_var: "count"}). \
    select(date_var).sort(F.col(date_var).asc()). \
    collect()
 
  unique_dates = [x[date_var] for x in dataset_dates]
 
  start_date = datetime.datetime.strptime(str(min(unique_dates)), '%Y%m%d')
  end_date = datetime.datetime.strptime(str(max(unique_dates)), '%Y%m%d')
 
  print("The start date of the data is {} and it will be considered as week index 1, the end date is {} and will be considered the last week index".
        format(start_date.strftime('/%Y/%m/%d'), end_date.strftime('/%Y/%m/%d')))
 
  date_range = [start_date + datetime.timedelta(days=x) for x in range((end_date - start_date).days+1)]
 
  res = []
  week_index = 0
 
  # 3) derive week index
  # 4) append it to original spark data
  for each_possible_date in date_range:
 
    if each_possible_date.weekday() == 0:
        week_index = week_index + 1
 
    temp_row = Row(
      week_index=week_index, 
      DATE_ISO=each_possible_date.year*10000 + each_possible_date.month*100 + each_possible_date.day, 
      seasonal_week_index=each_possible_date.strftime("%V")  # ISO 8601 week
    ) 
 
    temp_row.__fields__ = ["week_index", date_var, "seasonal_week_index"]
    res.append(temp_row)  
    
  return spark_data.join(spark.createDataFrame(res), on="DATE_ISO", how="left")


In [None]:
def get_time_elapsed(data, time_spec, data_extraction_date):
  """This function adds customer level features on days elapsed since earliest and latest occurance as relevant for a given dataset.
 
  Arguments:
      data {pyspark.sql.DataFrame} -- A spark dataframe to establish temporal features for
      time_spec {str} -- String to indicate dataset-specific spec (i.e. "sfe_visit", "red_score")
      data_extraction_date {str} -- String to indicate a dataset's date of extraction - of the type "2020-05-30"
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe expanded with time-elapsed-features.
  """
  
  data = data. \
    withColumn(
      "time_since_first_" + time_spec + "_days", 
      F.datediff(F.to_date(F.lit(data_extraction_date)), F.to_date(F.col("date_customer_first_" + time_spec).cast(StringType()), "yyyyMMdd"))
    ). \
    withColumn(
      "time_since_last_" + time_spec + "_days", 
      F.datediff(F.to_date(F.lit(data_extraction_date)), F.to_date(F.col("date_customer_last_" + time_spec).cast(StringType()), "yyyyMMdd"))
    )
  
  return data


In [None]:
# 2) Cooler functions 

In [None]:
def coolers_to_customer_count(coolers, id_cust="CUSTOMER_ID"):
  """This function aggregates the cooler data to customer level.
 
  Arguments:
      coolers {pyspark.sql.DataFrame} -- A spark dataframe that contains the raw cooler data
      id_cust {str} -- Name of customer id variable to use
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe with cooler data aggregated to customer level.
  """
  
  # set specs
  date_col = "PLACEMENT_DATE"
  value_cols = ["COOLER_COD"]
  
  # adjust dummy names
  door_types = coolers.select(F.col("NUM_DOORS")).distinct().rdd.flatMap(lambda x: x).collect()
  door_types = ["CUSTOMER_COUNT_COOLER_DOORS_" + str(x).replace(".", "_") for x in door_types]  
  
  # perform init prep (helper flags)
  coolers = cooler_init_prep(coolers)
  
  # aggregate to customer level
  cool_cust = coolers. \
    groupBy(id_cust). \
    agg(*(
      [F.count(x).alias("CUSTOMER_TOTAL_COUNT_COOLERS") for x in coolers.columns if x in value_cols]
      + [F.sum(x).alias(x) for x in coolers.columns if x in door_types] 
      + [F.min(date_col).alias("date_customer_first_cooler"), F.max(date_col).alias("date_customer_last_cooler")]  
      + [F.sum("COOLER_EARLIEST_NUM_DOORS").alias("CUSTOMER_EARLIEST_DATE_SUM_COOLER_DOORS")]
      + [F.sum("COOLER_LATEST_NUM_DOORS").alias("CUSTOMER_LATEST_DATE_SUM_COOLER_DOORS")]
      + [F.count("CUSTOMER_EARLIEST_DATE_COOLER_COUNT").alias("CUSTOMER_EARLIEST_DATE_COOLER_COUNT")]
      + [F.count("CUSTOMER_LATEST_DATE_COOLER_COUNT").alias("CUSTOMER_LATEST_DATE_COOLER_COUNT")]
    ))
 
  return cool_cust


In [None]:
def cooler_init_prep(coolers, id_cust="CUSTOMER_ID"):
  """This function takes the raw cooler data and expands it with flag-like features 
  to be leveraged in the aggregation stage.
 
  Arguments:
      coolers {pyspark.sql.DataFrame} -- A spark dataframe that contains the raw cooler data
      id_cust {str} -- Name of customer id variable to use
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe with the expanded cooler data.
  """
  
  # set specs
  date_col = "PLACEMENT_DATE"
  value_cols = ["COOLER_COD"]
  
  # scope door dummies 
  door_types = coolers.select(F.col("NUM_DOORS")).distinct().rdd.flatMap(lambda x: x).collect()
  types_expr = [F.when(F.col("NUM_DOORS") == x, 1).otherwise(0).alias("CUSTOMER_COUNT_COOLER_DOORS_" + str(x).replace(".", "_")) for x in door_types]
  
  # add dummies to original data
  coolers = coolers.select("CUSTOMER_ID", "CUSTOMER_COD", "COMPANY_COD", "NUM_DOORS", "PLACEMENT_DATE", "COOLER_COD", *types_expr)
  
  # add more flags to leverage in the aggregation step
  w = (Window()
    .partitionBy(coolers[id_cust])
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
 
  coolers = coolers. \
    withColumn("sfe_latest_date", F.max(coolers[date_col]).over(w)). \
    withColumn("sfe_earliest_date", F.min(coolers[date_col]).over(w))
 
  coolers = coolers. \
    withColumn("sfe_latest_flag", F.when(coolers["sfe_latest_date"] == coolers[date_col], F.lit(1)).otherwise(F.lit(0))). \
    withColumn("sfe_earliest_flag", F.when(coolers["sfe_earliest_date"] == coolers[date_col], F.lit(1)).otherwise(F.lit(0)))
  
  coolers = coolers. \
    withColumn("COOLER_EARLIEST_NUM_DOORS", F.when(coolers["sfe_earliest_flag"] == 1, coolers["NUM_DOORS"])). \
    withColumn("COOLER_LATEST_NUM_DOORS", F.when(coolers["sfe_latest_flag"] == 1, coolers["NUM_DOORS"])). \
    withColumn("CUSTOMER_EARLIEST_DATE_COOLER_COUNT", F.when(coolers["sfe_earliest_flag"] == 1, F.lit(1))). \
    withColumn("CUSTOMER_LATEST_DATE_COOLER_COUNT", F.when(coolers["sfe_latest_flag"] == 1, F.lit(1)))  
  
  return coolers


In [None]:
# 3) SFE functions


In [None]:
def sfe_to_customer_total(sfe, data_extraction_date="2020-05-30", id_cust="CUSTOMER_ID",):
  """This function aggregates the sfe data to customer level.
 
  Arguments:
      sfe {pyspark.sql.DataFrame} -- A spark dataframe that contains the raw sfe data
      data_extraction_date {str} -- String to indicate a dataset's date of extraction - of the type "2020-05-30"
      id_cust {str} -- Name of customer id variable to use
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe with sfe data aggregated to customer level.
  """
  
  # set specs
  value_cols = ["SCHEDULED", "SCANNED", "VISITED", "SALE"]
  time_spec = "sfe_visit"
  
  # aggregate to customer level
  sfe_cust = sfe. \
    withColumn("YM", F.col("VISIT_DATE").substr(1, 6)). \
    groupBy(*([id_cust] + ["YM"])). \
    agg(*(
      [F.sum(x).alias(x) for x in sfe.columns if x in value_cols]
      + [F.min(F.col("VISIT_DATE")).alias("date_customer_first_sfe_visit"), 
         F.max(F.col("VISIT_DATE")).alias("date_customer_last_sfe_visit")] 
    )). \
    groupBy(id_cust). \
    agg(*(
      [F.mean(x).alias("CUSTOMER_MONTHLY_MEAN_" + x) for x in sfe.columns if x in value_cols]
      + [F.min(F.col("date_customer_first_sfe_visit")).alias("date_customer_first_sfe_visit"), 
         F.max(F.col("date_customer_last_sfe_visit")).alias("date_customer_last_sfe_visit")] 
    ))
   
  # add time elapsed vars  
  sfe_cust = get_time_elapsed(sfe_cust, time_spec=time_spec, data_extraction_date=data_extraction_date)
  
  return sfe_cust


In [None]:
# 4) RED functions


In [None]:
def red_to_customer_total(red, data_extraction_date="2020-05-30", id_cust="CUSTOMER_ID",):
  """This function aggregates the red data to customer level.
 
  Arguments:
      red {pyspark.sql.DataFrame} -- A spark dataframe that contains the raw red data
      data_extraction_date {str} -- String to indicate a dataset's date of extraction - of the type "2020-05-30"
      id_cust {str} -- Name of customer id variable to use
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe with red data aggregated to customer level.
  """
  
  # starting specs
  time_spec = "red_score"
  
  # create dummies to leverage in the aggregation
  types = red.select(F.col("CRITERE_RED")).distinct().rdd.flatMap(lambda x: x).collect()
  types_scores = [F.when(F.col("CRITERE_RED") == x, 1).otherwise(0).alias("RED_SCORE_FOR_" + "CRITERE_" + x) for x in types]
  types_scores_pond = [F.when(F.col("CRITERE_RED") == x, 1).otherwise(0).alias("RED_SCORE_POND_FOR_" + "CRITERE_" + x) for x in types]
  
  # column names
  score_cols = ["RED_SCORE_FOR_" + "CRITERE_" + x for x in types]  
  score_pond_cols = ["RED_SCORE_POND_FOR_" + "CRITERE_" + x for x in types]  
  
  # add week -> needed for mean weekly calculations
  red = derive_spark_week(
    red.select(*(red.columns + types_scores + types_scores_pond)), 
    date_var="DATE_ISO")
  
  # populate dummies with meaningful data as per availability by red criteria
  for each in score_cols:
    red = red.withColumn(each, F.col("SCORE_RED") * F.col(each))
 
  for each in score_pond_cols:
    red = red.withColumn(each, F.col("RED_SCORE_POND") * F.col(each))
 
  # aggregate by customer + week
  red_cust = red. \
    groupBy(id_cust, "week_index"). \
    agg(*(
      [F.mean(x).alias(x) for x in red.columns if x in score_cols+score_pond_cols]
      + [F.min(F.col("DATE_ISO")).alias("date_customer_first_red_score"), 
         F.max(F.col("DATE_ISO")).alias("date_customer_last_red_score")]  
    ))
 
  # aggregate by customer only
  red_cust = red_cust. \
    groupBy(id_cust). \
    agg(*(
      [F.mean(x).alias("MEAN_WEEKLY_"+x) for x in red_cust.columns if x in score_cols+score_pond_cols]
      + [F.min(F.col("date_customer_first_red_score")).alias("date_customer_first_red_score"), 
         F.max(F.col("date_customer_last_red_score")).alias("date_customer_last_red_score")]  
    ))
  
  # add time elapsed vars  
  red_cust = get_time_elapsed(red_cust, time_spec=time_spec, data_extraction_date=data_extraction_date)
  
  return red_cust