 ## Division Propensity Recommendation script     
Last update: March 3,2021  
Version 1.2   
**Output format:** cmd_id | category | division_id | division | propensity_score | propensity_rank | next_best_rank

In [1]:
from pyspark.sql import functions as F
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, IntegerType,FloatType, StringType
from pyspark.sql.functions import col, rank,concat, concat_ws, monotonically_increasing_id
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import monotonically_increasing_id

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1302,application_1570493391423_69060,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc = spark.sparkContext
sc.setCheckpointDir("s3://nmg-analytics-ds-prod/ds/dev/Users/nmdst400/checkpointdir")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#Code Parameters
#brand = 'nm'
#env = 'prod'
#wbrand ='NM'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
input_f='s3://nmg-analytics-ds-prod/ds/'+env+'/ClientConnect/CMD/data/'+brand+'/cust_division_rank.csv.gz'
df=spark.read.option("header","true").csv(input_f, sep=',')

df = df.withColumn('product', concat_ws('_','category','division_id','division'))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
indexer1 = StringIndexer(inputCol="cmd_id", outputCol="userID")
df_staging = indexer1.fit(df).transform(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
indexer2 = StringIndexer(inputCol="product", outputCol="catID")
df_staging = indexer2.fit(df_staging).transform(df_staging)
df_1=df_staging.selectExpr('userID','catID','cast(score as double) score')
(train_set, validation_set) = df_1.randomSplit([0.8, 0.2])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Search optimal model

In [7]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=10, rank= 3, regParam=0.05, implicitPrefs=False,coldStartStrategy="drop",\
          userCol='userID', itemCol='catID', ratingCol='score')

model = als.fit(train_set)
#predictions = model.transform(validation_set)

#evaluator = RegressionEvaluator(metricName="mae", labelCol="score", predictionCol="prediction")
#mae = evaluator.evaluate(predictions)

#print("Mean Absolute Error ", mae)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create the Recommendations for all users

In [8]:
# Generate top 10 recommendations for each user
userRecs = model.recommendForAllUsers(15)

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
userRecs.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userID: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- catID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

In [9]:
userRecs_output=userRecs.select('userID',F.posexplode('recommendations')).select('userID','col.catID','pos','col.rating').checkpoint()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
userIndex=df_staging.selectExpr('cast(userID as int) userID','cmd_id').distinct()
catIndex=df_staging.selectExpr('cast(catID as int) catID','product').distinct()
recs_df = userRecs_output.join(catIndex,userRecs_output.catID==catIndex.catID).select('userID','product','pos','rating')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
recs_df = recs_df.join(userIndex,recs_df.userID==userIndex.userID).select('cmd_id','product','pos','rating')
recs_df = recs_df.withColumnRenamed("pos", "propensity_rank").withColumnRenamed("rating","propensity_score")
recs_df = recs_df.withColumn("propensity_rank",F.col("propensity_rank")+1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The prediction scores returned by **ALS** with implicit feedbacks with Apache Spark aren't normalized to fit be between [0,1] or [see](https://stackoverflow.com/questions/46904078/spark-als-recommendation-system-have-value-prediction-greater-than-1). ALS uses stochastic gradient descent and approximations to compute (and re-compute) users and item factors on each step to minimize the cost function which allows it to scale.  
As a matter of fact, normalizing those scores isn't relevant. The reason for this is actually that those scores doesn't mean much on their own. [refer](https://stackoverflow.com/questions/46462470/how-can-i-evaluate-the-implicit-feedback-als-algorithm-for-recommendations-in-ap/46490352#46490352).

### Scaling into range 1-100

In [12]:
max_recs, min_recs = recs_df.select("propensity_score").rdd.max()[0], recs_df.select("propensity_score").rdd.min()[0]
recs_df= recs_df.withColumn("propensity_score",(100-1)*(recs_df["propensity_score"]- min_recs)/(max_recs-min_recs) + 1 )


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create new key column to join

In [13]:
recs_df = recs_df.withColumn("id_product",F.concat(col('cmd_id'), col('product')))
#recs_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+---------------+------------------+--------------------+
|  cmd_id|             product|propensity_rank|  propensity_score|          id_product|
+--------+--------------------+---------------+------------------+--------------------+
| W75LuqZ|Women's Apparel_1...|              1| 61.12565059214503|W75LuqZWomen's Ap...|
| W75LuqZ|Contemporary Appa...|              2| 60.37537201631657|W75LuqZContempora...|
| W75LuqZ|    Beauty_53_Beauty|              3| 57.89019971226391|W75LuqZBeauty_53_...|
| W75LuqZ|Gifts & Home_21_G...|              4| 56.76275366162963|W75LuqZGifts & Ho...|
| W75LuqZ|Fine Apparel_81_F...|              5|56.128202654150016|W75LuqZFine Appar...|
| W75LuqZ|Intimate Apparel_...|              6| 55.99478491577048|W75LuqZIntimate A...|
| W75LuqZ|Ladies Shoes_34_L...|              7|55.555856982762336|W75LuqZLadies Sho...|
| W75LuqZ|Children's_15_Chi...|              8| 55.24015338089912|W75LuqZChildren's...|
| W75LuqZ|  Jewelry_56_Jewelry| 


### Load data recent  historical trans data (default= 30 days) 

In [14]:
input_dir='s3://nmg-analytics-ds-prod/ds/'+env+'/ClientConnect/CMD/data/'+brand+'/'
f = input_dir+'recent_history.csv.gz'
df_recent=spark.read.option("header","True").csv(f)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df_recent = df_recent.withColumn('product',concat_ws('_','division','division_id','division')).select('cmd_id','product')
df_recent = df_recent.withColumn("id_product",F.concat(col('cmd_id'), col('product')))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# find cmd_id with category (in recommendation list) have not purchased before (in 3 year hist trans)
df_recent = recs_df.join(df_recent ,recs_df.id_product == df_recent.id_product, "leftanti")
df_recent = df_recent.selectExpr('cmd_id','product','propensity_score')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Re_order after merge
df_recent = df_recent.orderBy(F.col("cmd_id"),F.col("propensity_score").desc())
df_recent = df_recent.selectExpr("cmd_id", "product","propensity_score")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
from pyspark.sql.window import Window


window = Window.partitionBy(df_recent['cmd_id']).orderBy(df_recent['propensity_score'].desc())
df_recent = df_recent.select('*', F.row_number().over(window).alias('next_best_rank'))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
recs_df = recs_df.selectExpr('cmd_id','product','propensity_score','propensity_rank')
recs_df = recs_df.join(df_recent.selectExpr('cmd_id','product','next_best_rank'),['cmd_id','product'],how='left')
recs_df= recs_df.orderBy(F.col("cmd_id"),F.col("propensity_score").desc(),F.col("next_best_rank"))
#recs_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_recent = recs_df.select(F.split(recs_df.product,"_")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])

df_recent = df_recent.withColumn("id", monotonically_increasing_id())


In [None]:
recs_df = recs_df.withColumn("id", monotonically_increasing_id())

In [None]:
recs_df = recs_df.join(df_recent, df_recent.id== recs_df.id).drop("id")
recs_df = recs_df.withColumnRenamed('col1','category').withColumnRenamed('col2','division_id').withColumnRenamed('col3','division')
recs_df = recs_df.orderBy(F.col("cmd_id"),F.col("propensity_score").desc(),F.col("next_best_rank"))


In [None]:
recs_df = recs_df.select('cmd_id','category','division_id','division','propensity_score','propensity_rank','next_best_rank')
#recs_df.show()

### Saving to S3

In [None]:

recs_df.write.parquet("s3a://nmg-analytics-ds-prod/ds/{0}/product_propensity/{1}/output/division_propensity/".format(env,brand),mode="overwrite")