In [0]:
# Import required libraries
from pyspark.sql import functions as F
# Load October raw data
october_df = spark.read.csv("/Volumes/workspace/advecom/advecom_data/2019-Oct.csv", header=True, inferSchema=True)

# Create user feature dataframe, with necessary features only 
user_features_df = october_df.filter(F.col("price") > 0) \
    .filter("user_id IS NOT NULL") \
    .dropDuplicates(["user_id","product_id","event_type","event_time"]) \
    .groupBy("user_id") \
    .agg(
        F.sum(
            F.when(F.col("event_type")=="view",1)
            .otherwise(0)
        ).alias("total_views"),
        F.sum(
            F.when(F.col("event_type")=="cart",1)
            .otherwise(0)
        ).alias("total_cart"),
        F.sum(
            F.when(F.col("event_type")=="purchase",1)
            .otherwise(0)
        ).alias("total_purchases"),
        F.avg("price").alias("avg_price"),
        F.count("user_id").alias("total_events"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price")).otherwise(0)).alias("total_spent")
    )
    
# Check user features
display(user_features_df.limit(5))

user_id,total_views,total_cart,total_purchases,avg_price,total_events,total_spent
516407514,19,2,1,224.0890909090909,22,231.64
547701060,14,2,0,1119.1575,16,0.0
514555296,65,0,1,283.03409090909093,66,244.28
555846537,23,2,2,203.63814814814816,27,368.04
513383856,11,0,0,724.3663636363636,11,0.0


In [0]:
# Create target label dataframe from user features
labeled_df = user_features_df.withColumn(
    "label",
    F.when(F.col("total_purchases")>0,1)
    .otherwise(0)
    )

# Check labeled dataframe
display(labeled_df)

# Check distribution of labels
labeled_df.groupBy("label").count().show()

user_id,total_views,total_cart,total_purchases,avg_price,total_events,total_spent,label
516407514,19,2,1,224.0890909090909,22,231.64,1
547701060,14,2,0,1119.1575,16,0.0,0
514555296,65,0,1,283.03409090909093,66,244.28,1
555846537,23,2,2,203.63814814814816,27,368.04,1
513383856,11,0,0,724.3663636363636,11,0.0,0
519148299,11,1,3,248.33800000000005,15,483.4500000000001,1
516264626,8,0,0,392.1275,8,0.0,0
515936492,30,0,0,479.965,30,0.0,0
521328224,13,0,0,157.47384615384615,13,0.0,0
535611590,10,0,0,7.726999999999999,10,0.0,0


+-----+-------+
|label|  count|
+-----+-------+
|    1| 347118|
|    0|2674317|
+-----+-------+



In [0]:
# Create two dataframes for purchase and non-purchase, and then combine them with a smaller fraction of non purchase. As a result the model will be trained on a balanced dataset.
purchase_df = labeled_df.filter("label=1")

non_purchase_df = labeled_df.filter("label=0") \
.sample(fraction=0.13)

balanced_df = purchase_df.union(non_purchase_df)

balanced_df.groupBy("label").count().show()

+-----+------+
|label| count|
+-----+------+
|    1|347118|
|    0|347499|
+-----+------+



In [0]:
# Import required libraries
from pyspark.ml.feature import VectorAssembler

# Create features vector for ML
assembler = VectorAssembler(
    inputCols=[
        "total_views",
        "total_cart",
        "total_spent",
        "total_events",
        "avg_price"
    ],
    outputCol="features"
)

# Create scoring dataframe with user_id, features, and label
scoring_df = assembler.transform(balanced_df).select("user_id","features","label")

display(scoring_df)

user_id,features,label
516407514,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""19.0"",""2.0"",""231.64"",""22.0"",""224.08909090909094""]}",1
514555296,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""65.0"",""0.0"",""244.28"",""66.0"",""283.03409090909093""]}",1
555846537,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""23.0"",""2.0"",""368.04"",""27.0"",""203.63814814814816""]}",1
519148299,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""11.0"",""1.0"",""483.45000000000005"",""15.0"",""248.33800000000005""]}",1
515880464,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""41.0"",""0.0"",""274.91"",""42.0"",""541.437380952381""]}",1
544268631,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""38.0"",""0.0"",""574.02"",""39.0"",""302.8282051282051""]}",1
549277997,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""150.0"",""0.0"",""266.4"",""153.0"",""76.1002614379085""]}",1
543232438,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""108.0"",""1.0"",""88.79"",""110.0"",""283.26227272727266""]}",1
514508014,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""53.0"",""4.0"",""86.48"",""59.0"",""86.27898305084747""]}",1
556010139,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""3.0"",""1.0"",""359.03"",""5.0"",""359.03""]}",1


In [0]:
# Import required libraries
import mlflow.spark
import os

# Get model uri from the best pre-trained model from the previous runs
model_uri = "runs:/97ff47fb9fbe4952b0aff2569a17aa94/logistic_model"
# Set mlflow dfs tmp, as it is needed for model loading
os.environ['MLFLOW_DFS_TMP'] = "/Volumes/workspace/advecom/advecom_data/mlflowlog"
best_model = mlflow.spark.load_model(model_uri)

# Make predictions on entire data
lr_pred = best_model.transform(scoring_df)

display(lr_pred)


In [0]:
# Import required libraries
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array

# Create gold dataframe containing score for all users
gold_df = lr_pred.select(
    "user_id",
    vector_to_array(F.col("probability"))[1].alias("purchase_probability"),
    "prediction"
)

display(gold_df)

user_id,purchase_probability,prediction
516407514,1.0,1.0
514555296,1.0,1.0
555846537,1.0,1.0
519148299,1.0,1.0
515880464,1.0,1.0
544268631,1.0,1.0
549277997,1.0,1.0
543232438,1.0,1.0
514508014,1.0,1.0
556010139,1.0,1.0


In [0]:
# Save the gold dataframe in delta format
gold_df.write.format("delta") \
.mode("overwrite") \
.save("/Volumes/workspace/advecom/advecom_data/gold_user_scores")

# Create top 10 users dataframe
top_users = gold_df.orderBy(
    col("purchase_probability").desc()
)

display(top_users.limit(10))

user_id,purchase_probability,prediction
526720323,1.0,1.0
516407514,1.0,1.0
539569520,1.0,1.0
514555296,1.0,1.0
542502687,1.0,1.0
555846537,1.0,1.0
542791806,1.0,1.0
519148299,1.0,1.0
516221463,1.0,1.0
515880464,1.0,1.0
