# Overview
   ### 1) Interest-based recommendations
   ### 2) Recommendations based on past purchase
   ### 3) Items frequenty bought together
   ### 4) Items frequenty viwed together
   ### 5) Card Based recommendations
   ### 6) Popular viewed items
   ### 7) Best seller recomendation
   ### 8) Targetting ads recommendations 
   ### 9) Special event recommendations

# Interest-based ADs
 ### ALS - Algoritham will Recommends the products based on similar interest or tastes

In [2]:
# Library imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


In [3]:
# Spark session initialize 
spark = SparkSession.builder.master("local[*]").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/17 11:14:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Sample data for user ratings Initialization
ratings_input_file_path = "/Users/k0d04mr/dev/Applications/notebooks/data/ratings_Electronics.csv"
ratins_cleaned_dataset_path = "/Users/k0d04mr/dev/Applications/notebooks/data/cleaned/"

In [4]:
# Reading  sample data for user ratings for products 

schema = StructType(
    [
      StructField("userName",StringType()),
      StructField("productName",StringType()),
      StructField("ratings",DoubleType()),
      StructField("timestamp",LongType())
    ]
)       
raw_ratings_df = spark.read.format("csv").schema(schema).load(ratings_input_file_path)

print("ratings count", raw_ratings_df.count())
raw_ratings_df.show()

                                                                                

ratings count 7824482
+--------------+-----------+-------+----------+
|      userName|productName|ratings| timestamp|
+--------------+-----------+-------+----------+
| AKM1MP6P0OYPR| 0132793040|    5.0|1365811200|
|A2CX7LUOHB2NDG| 0321732944|    5.0|1341100800|
|A2NWSAGRHCP8N5| 0439886341|    1.0|1367193600|
|A2WNBOD3WNDNKT| 0439886341|    3.0|1374451200|
|A1GI0U4ZRJA8WN| 0439886341|    1.0|1334707200|
|A1QGNMC6O1VW39| 0511189877|    5.0|1397433600|
|A3J3BRHTDRFJ2G| 0511189877|    2.0|1397433600|
|A2TY0BTJOTENPG| 0511189877|    5.0|1395878400|
|A34ATBPOK6HCHY| 0511189877|    5.0|1395532800|
| A89DO69P0XZ27| 0511189877|    5.0|1395446400|
| AZYNQZ94U6VDB| 0511189877|    5.0|1401321600|
|A1DA3W4GTFXP6O| 0528881469|    5.0|1405641600|
|A29LPQQDG7LD5J| 0528881469|    1.0|1352073600|
| AO94DHGC771SJ| 0528881469|    5.0|1370131200|
| AMO214LNFCEI4| 0528881469|    1.0|1290643200|
|A28B1G1MSJ6OO1| 0528881469|    4.0|1280016000|
|A3N7T0DY83Y4IG| 0528881469|    3.0|1283990400|
|A1H8PY3QHMQQA0| 0

In [5]:
# Keeping  users who have rated frequently 

## To make better descision 
## we are including users who bought and rated more than 50 items

filteredRatingsDF = raw_ratings_df.select(
    "*",
    sum(lit(1)).over(Window.partitionBy("userName")).alias("user_count"),
).filter(
        col("user_count") >= 50
).withColumn(
    "ratings", col("ratings").cast(IntegerType())
)
print("count of rows where users rated above 50:", filteredRatingsDF.count())
filteredRatingsDF.show()



                                                                                

count of rows where users rated above 50: 125871




+--------------+-----------+-------+----------+----------+
|      userName|productName|ratings| timestamp|user_count|
+--------------+-----------+-------+----------+----------+
|A100UD67AHFODS| B00004Z5M1|      5|1350086400|       116|
|A100UD67AHFODS| B00005T3X7|      5|1354665600|       116|
|A100UD67AHFODS| B000069EUW|      5|1351814400|       116|
|A100UD67AHFODS| B000069JWX|      1|1070841600|       116|
|A100UD67AHFODS| B0000AR0I4|      5|1353369600|       116|
|A100UD67AHFODS| B0001D3K8A|      5|1150588800|       116|
|A100UD67AHFODS| B000233WJ6|      5|1369267200|       116|
|A100UD67AHFODS| B0002HJGUQ|      5|1360454400|       116|
|A100UD67AHFODS| B0002KVQBA|      5|1360886400|       116|
|A100UD67AHFODS| B0002SQ2P2|      5|1150588800|       116|
|A100UD67AHFODS| B000ERAOL4|      5|1350086400|       116|
|A100UD67AHFODS| B000H0K8VY|      5|1364947200|       116|
|A100UD67AHFODS| B000HZDF8W|      5|1356134400|       116|
|A100UD67AHFODS| B000J0072M|      3|1356134400|       11

                                                                                

In [20]:
# Creating userid and productID column 
userId_df = filteredRatingsDF.select("userName").dropDuplicates().coalesce(1).withColumn("userId", monotonically_increasing_id() + 1)
productId_df = filteredRatingsDF.select("productName").dropDuplicates().coalesce(1).withColumn("productId", monotonically_increasing_id() + 1)
userIdJoindDF = filteredRatingsDF.join(userId_df, on="userName", how="inner")
cleanedDF = userIdJoindDF.join(productId_df, on="productName", how="inner")
cleanedDF.summary().show()

# Writing cleaned dataset into disk
cleanedDF.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save(ratins_cleaned_dataset_path)

                                                                                

+-------+-------------------+--------------+----------------+-------------------+------------------+------------------+------------------+
|summary|        productName|      userName|         ratings|          timestamp|        user_count|            userId|         productId|
+-------+-------------------+--------------+----------------+-------------------+------------------+------------------+------------------+
|  count|             125871|        125871|          125871|             125871|            125871|            125871|            125871|
|   mean|5.134991108953125E9|          null|4.26133898991825|  1.3219790174512E9|107.99077627094407|  775.465071382606|15068.817503634673|
| stddev|4.215136210727178E9|          null|1.06214410824082|7.583598570410088E7| 78.10278159362238|449.20511305159215|13192.368850961084|
|    min|         0594451647|A100UD67AHFODS|               1|          939600000|                50|                 1|                 1|
|    25%|      1.400501776E

                                                                                

In [21]:
# Reading products summary rating data 

ratings_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(ratins_cleaned_dataset_path)
ratings_df.show()

+-----------+--------------+-------+----------+----------+------+---------+
|productName|      userName|ratings| timestamp|user_count|userId|productId|
+-----------+--------------+-------+----------+----------+------+---------+
| B00004Z5M1|A100UD67AHFODS|      5|1350086400|       116|     1|     2272|
| B00005T3X7|A100UD67AHFODS|      5|1354665600|       116|     1|     5072|
| B000069EUW|A100UD67AHFODS|      5|1351814400|       116|     1|     2407|
| B000069JWX|A100UD67AHFODS|      1|1070841600|       116|     1|     1049|
| B0000AR0I4|A100UD67AHFODS|      5|1353369600|       116|     1|      397|
| B0001D3K8A|A100UD67AHFODS|      5|1150588800|       116|     1|     4296|
| B000233WJ6|A100UD67AHFODS|      5|1369267200|       116|     1|     5802|
| B0002HJGUQ|A100UD67AHFODS|      5|1360454400|       116|     1|     3620|
| B0002KVQBA|A100UD67AHFODS|      5|1360886400|       116|     1|     1217|
| B0002SQ2P2|A100UD67AHFODS|      5|1150588800|       116|     1|     4492|
| B000ERAOL4

In [8]:
# Splitting the data into 2 for training through als algo and testing using random split 
(trainingDF, testDF) = ratings_df.randomSplit([0.8, 0.2])

In [9]:
# Training the model using als algo
als = ALS(
    maxIter=5, 
    regParam=0.01, 
    userCol="userId", 
    itemCol="productId", 
    ratingCol="ratings",
    implicitPrefs=True,
    coldStartStrategy="drop"
)
model = als.fit(trainingDF)

22/05/14 18:26:38 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/14 18:26:38 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/14 18:26:38 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [10]:
# Evaluate the model accuracy by computing error on test data  
predictions = model.transform(testDF)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratings",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

                                                                                

Root-mean-square error = 2.8970318109809927


In [25]:
# Recommending the product for each user 
userRecs = model.recommendForAllUsers(1)
userRecs.show(10, False)



+------+--------------------+
|userId|recommendations     |
+------+--------------------+
|1     |[{17759, 10.529773}]|
|3     |[{1369, 9.224239}]  |
|6     |[{26775, 9.756778}] |
|12    |[{564, 11.828667}]  |
|13    |[{17695, 11.306157}]|
|16    |[{19600, 8.861739}] |
|20    |[{14377, 10.893919}]|
|22    |[{16022, 10.94829}] |
|26    |[{21647, 10.295612}]|
|27    |[{23726, 8.875915}] |
+------+--------------------+
only showing top 10 rows



                                                                                

In [19]:
# joining back with ratings data set to get the actual user name and product name 

flattendRecDf = userRecs.withColumn(
    "recommendations", explode("recommendations")
).withColumn(
    "productId", col("recommendations.productId")
)
flattenUserIdJoinedDF = flattendRecDf.join(
    ratings_df.select(
        "userId",
        "userName"
    ).dropDuplicates(), 
    on="userId", 
    how="inner"
).join(
    ratings_df.select(
        "productName",
        "productId"
    ).dropDuplicates(),
    on="productId", 
    how="inner"
).select(
    col("userName"),
    col("productName").alias("recomendatedProductName")
)
# finalJoinedDF.show(100, False)

flattenUserIdJoinedDF.show(10, False)

                                                                                

+--------------+-----------------------+
|userName      |recomendatedProductName|
+--------------+-----------------------+
|A105S56ODHGJEK|B00BCGRTFK             |
|A10Y058K7B96C6|B000AYJDD6             |
|A12LH2100CKQO |B0026SKZO0             |
|A1CST2WUA32GP0|B002LITT56             |
|A1DQHS7MOVYYYA|B0002RBQO0             |
|A1K4G5YJDJQI6Q|B004QQY2JO             |
|A1SFPA80X7TRBR|B001212ELY             |
|A1URXSRV6WDHVY|B004XW2NEW             |
|A1ZVFCPHCWFV71|B001N9X4CS             |
|A20DDH4NT6Q1E8|B0098Y77U0             |
+--------------+-----------------------+
only showing top 10 rows



In [32]:
# Recommeding product for a user 
userId = 100
numberOfRecommendationsNeeded = 5
userRecs = model.recommendForUserSubset(ratings_df.where(col("userId") == userId), numberOfRecommendationsNeeded)
userRecs.show(5, False)

+------+------------------------------------------------------------------------------------------+
|userId|recommendations                                                                           |
+------+------------------------------------------------------------------------------------------+
|100   |[{7349, 9.862763}, {5623, 8.93787}, {8946, 8.914307}, {1008, 8.304108}, {10150, 8.182521}]|
+------+------------------------------------------------------------------------------------------+



# Summary of Interest based Ads
 ## ALS will give us the interested product names, we have to play the Ads on the screen for the same.
 # --------------------------------**--------------------------------

# Loading orders data 

In [4]:
# input paths
basePath = "/Users/k0d04mr/dev/Applications/notebooks/data"
events_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(basePath + "/events.csv")
category_tree_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(basePath + "/category_tree.csv")
item_properties_1_df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(basePath + "/item_properties_part1.csv")
item_properties_2_df =  spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(basePath + "/item_properties_part2.csv")

                                                                                

In [4]:
# events data sample
events_df.show(10, False)

+-------------+---------+-----+------+-------------+
|timestamp    |visitorid|event|itemid|transactionid|
+-------------+---------+-----+------+-------------+
|1433221332117|257597   |view |355908|null         |
|1433224214164|992329   |view |248676|null         |
|1433221999827|111016   |view |318965|null         |
|1433221955914|483717   |view |253185|null         |
|1433221337106|951259   |view |367447|null         |
|1433224086234|972639   |view |22556 |null         |
|1433221923240|810725   |view |443030|null         |
|1433223291897|794181   |view |439202|null         |
|1433220899221|824915   |view |428805|null         |
|1433221204592|339335   |view |82389 |null         |
+-------------+---------+-----+------+-------------+
only showing top 10 rows



In [5]:
# category data sample
category_tree_df.show(10, False)

+----------+--------+
|categoryid|parentid|
+----------+--------+
|1016      |213     |
|809       |169     |
|570       |9       |
|1691      |885     |
|536       |1691    |
|231       |null    |
|542       |378     |
|1146      |542     |
|1140      |542     |
|1479      |1537    |
+----------+--------+
only showing top 10 rows



In [6]:
# item_properties_1 sample

item_properties_1_df.show(10, False)

+-------------+------+----------+-------------------------------+
|timestamp    |itemid|property  |value                          |
+-------------+------+----------+-------------------------------+
|1435460400000|460429|categoryid|1338                           |
|1441508400000|206783|888       |1116713 960601 n277.200        |
|1439089200000|395014|400       |n552.000 639502 n720.000 424566|
|1431226800000|59481 |790       |n15360.000                     |
|1431831600000|156781|917       |828513                         |
|1436065200000|285026|available |0                              |
|1434250800000|89534 |213       |1121373                        |
|1431831600000|264312|6         |319724                         |
|1433646000000|229370|202       |1330310                        |
|1434250800000|98113 |451       |1141052 n48.000                |
+-------------+------+----------+-------------------------------+
only showing top 10 rows



In [7]:
# item_properties_2 sample
item_properties_2_df.show(10, False)

+-------------+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp    |itemid|property|value                                                                                                                                                                                                                                                                                                        |
+-------------+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
events_df.filter(
    col("transactionid").isNotNull()
).groupBy(
    "visitorid",
    "itemid"
).count().filter(
    col("count") > 10
).show(10, False)

    

[Stage 12:>                                                       (0 + 16) / 16]

+---------+------+-----+
|visitorid|itemid|count|
+---------+------+-----+
|152963   |119736|14   |
+---------+------+-----+



                                                                                

 # --------------------------------**--------------------------------

# Recommendations based on past purchase
   ## 1) Based on frequently buying products from their purchase history
   ## 2) Randomly selecting the prodcuts from their purchase history

## 1) Based on frequently buying products from their purchase history

In [9]:
# Select User to recommend the prodcuts 
visitorid = 152963

In [10]:
filteredUserDataDF = events_df.filter(col("visitorid") == visitorid).filter(col("transactionid").isNotNull())

In [11]:
# Finding how frequently product is bought 
frequentlyboughtItemsDf = filteredUserDataDF.select(
    col("*"),
    sum(lit(1)).over(Window.partitionBy(col("itemid"))).alias("number_of_times_bought"),
    (col("timestamp") - lag(col("timestamp"), 1).over(Window.partitionBy("itemid").orderBy("timestamp"))).alias("freq")
).filter(
    col("freq").isNotNull()
).withColumn(
    "freq_avg", avg("freq").over(Window.partitionBy("itemid"))
).withColumn(
    "nextCycleTimeStamp", from_unixtime(((col("timestamp") + col("freq_avg")) / 1000).cast(LongType())
                                        )
).withColumn(
    "current_timestamp", from_unixtime(unix_timestamp())
)


In [44]:
frequentlyboughtItemsDf.show()

+-------------+---------+-----------+------+-------------+----------------------+----------+-------------------+-------------------+-------------------+
|    timestamp|visitorid|      event|itemid|transactionid|number_of_times_bought|      freq|           freq_avg| nextCycleTimeStamp|  current_timestamp|
+-------------+---------+-----------+------+-------------+----------------------+----------+-------------------+-------------------+-------------------+
|1439921960859|   152963|transaction|  9877|        16112|                     2| 956882069|       9.56882069E8|2015-08-30 01:37:22|2022-05-15 07:03:01|
|1440029657713|   152963|transaction| 17478|         6781|                     2|    266988|           266988.0|2015-08-20 05:48:44|2022-05-15 07:03:01|
|1441735783315|   152963|transaction| 57723|          884|                     2|3094072329|      3.094072329E9|2015-10-14 19:07:35|2022-05-15 07:03:01|
|1440775915015|   152963|transaction| 96940|        12881|                     2| 

In [12]:
# Filtring the products whose frequency time is completed
recommendatedItemsDF = frequentlyboughtItemsDf.filter(
    col("nextCycleTimeStamp") <= col("current_timestamp")
)
recommendatedItemsDF.show(10, False)

+-------------+---------+-----------+------+-------------+----------------------+----------+-------------------+-------------------+-------------------+
|timestamp    |visitorid|event      |itemid|transactionid|number_of_times_bought|freq      |freq_avg           |nextCycleTimeStamp |current_timestamp  |
+-------------+---------+-----------+------+-------------+----------------------+----------+-------------------+-------------------+-------------------+
|1439921960859|152963   |transaction|9877  |16112        |2                     |956882069 |9.56882069E8       |2015-08-30 01:37:22|2022-05-15 07:06:25|
|1440029657713|152963   |transaction|17478 |6781         |2                     |266988    |266988.0           |2015-08-20 05:48:44|2022-05-15 07:06:25|
|1441735783315|152963   |transaction|57723 |884          |2                     |3094072329|3.094072329E9      |2015-10-14 19:07:35|2022-05-15 07:06:25|
|1440775915015|152963   |transaction|96940 |12881        |2                     |1

In [13]:
# Selecting top 10 products
selectingProductByRank = recommendatedItemsDF.select(
    col("visitorid"),
    col("itemid"),
    col("number_of_times_bought")
).dropDuplicates().select(
    "*",
    rank().over(Window.orderBy(col("number_of_times_bought").desc())).alias("rank")
).filter(
    col("rank") <= 10
).select(
    col("visitorid"),
    col("itemid"),
    col("rank")
)

selectingProductByRank.show(10, False)

22/05/15 07:06:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/15 07:06:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/15 07:06:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/15 07:06:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------+------+----+
|visitorid|itemid|rank|
+---------+------+----+
|152963   |119736|1   |
|152963   |320130|2   |
|152963   |9877  |3   |
|152963   |17478 |3   |
|152963   |57723 |3   |
|152963   |96940 |3   |
|152963   |171427|3   |
|152963   |248455|3   |
|152963   |304839|3   |
|152963   |334401|3   |
+---------+------+----+
only showing top 10 rows



 # --------------------------------**--------------------------------

# 2) Randomly selecting the prodcuts from their purchase history

In [23]:
# Randomly selecting 10 products from their puschasing history and plyaing ads for the same.
filteredUserDataDF.select("itemid").dropDuplicates().sample(fraction=0.1).limit(10).show()

+------+
|itemid|
+------+
|104891|
|231482|
|315919|
|373861|
|330506|
|433305|
| 39773|
|245168|
|375150|
|213306|
+------+



# Items frequenty bought together

In [26]:
# Select User to recommend the prodcuts 
visitorid = 152963
customerCurrentlyViewingItem = 119736

In [29]:
# Filtering the users who bought the same item
coustomerBoughtSameProductDF = events_df.filter(
    col("transactionid").isNotNull()
).filter(
    col("itemId") == customerCurrentlyViewingItem
)
coustomerBoughtSameProductDF.show(10, False)

+-------------+---------+-----------+------+-------------+
|timestamp    |visitorid|event      |itemid|transactionid|
+-------------+---------+-----------+------+-------------+
|1433185467024|1161163  |transaction|119736|118          |
|1433377677494|438441   |transaction|119736|8568         |
|1433382787152|438441   |transaction|119736|2787         |
|1433455473821|1093035  |transaction|119736|15356        |
|1433438926327|438441   |transaction|119736|375          |
|1433518030494|1161163  |transaction|119736|3620         |
|1433522055496|138131   |transaction|119736|4440         |
|1433549771403|1161163  |transaction|119736|1178         |
|1433703973431|892355   |transaction|119736|10582        |
|1433799620641|1093035  |transaction|119736|6736         |
+-------------+---------+-----------+------+-------------+
only showing top 10 rows



In [37]:
# selecting other products which is purchased with currently viewing item
coProductsWithCurrentlyViewingDF = coustomerBoughtSameProductDF.alias("a").join(
    events_df.alias("b"),
    on="visitorid",
    how="inner"
).select(
    col("b.itemId")
    
).dropDuplicates()
coProductsWithCurrentlyViewingDF.show(10)



+------+
|itemId|
+------+
|227161|
|354173|
|310514|
|424293|
|  8638|
|208595|
| 11316|
|296211|
|209734|
|152836|
+------+
only showing top 10 rows



                                                                                

# So now we can present to the visitor a list of the other items a customer previously bought along with what item the current visitor is viewing e.g. item number 119736

In [38]:
# Removing the current viewing item 
coProductsOnlyDF = coProductsWithCurrentlyViewingDF.filter(
    col("itemId") != customerCurrentlyViewingItem
)
coProductsOnlyDF.show(10, False)

[Stage 75:===>                                                    (1 + 16) / 17]

+------+
|itemId|
+------+
|227161|
|354173|
|310514|
|424293|
|8638  |
|208595|
|11316 |
|296211|
|209734|
|152836|
+------+
only showing top 10 rows



                                                                                

# --------------------------------**--------------------------------


# Items frequenty viwed together

In [5]:
# Select User to recommend the prodcuts 
visitorid = 152963
customerCurrentlyViewingItem = 119736

In [7]:
# Filtering the users who viewed the same item
coustomerViewdSameProductDF = events_df.filter(
    col("event") == 'view'
).filter(
    col("itemId") == customerCurrentlyViewingItem
)
coustomerViewdSameProductDF.show(10, False)

+-------------+---------+-----+------+-------------+
|timestamp    |visitorid|event|itemid|transactionid|
+-------------+---------+-----+------+-------------+
|1433177781018|286616   |view |119736|null         |
|1433220222122|163253   |view |119736|null         |
|1433181192471|286616   |view |119736|null         |
|1433178692996|350566   |view |119736|null         |
|1433275279545|163561   |view |119736|null         |
|1433263385707|286616   |view |119736|null         |
|1433285028681|163561   |view |119736|null         |
|1433273140155|286616   |view |119736|null         |
|1433259935129|286616   |view |119736|null         |
|1433295027862|163561   |view |119736|null         |
+-------------+---------+-----+------+-------------+
only showing top 10 rows



In [8]:
# selecting other products which is viewed with currently viewing item
coProductsWithCurrentlyViewingDF = coustomerViewdSameProductDF.alias("a").join(
    events_df.alias("b"),
    on="visitorid",
    how="inner"
).select(
    col("b.itemId")
    
).dropDuplicates()
coProductsWithCurrentlyViewingDF.show(10)

[Stage 14:>                                                       (0 + 16) / 16]

+------+
|itemId|
+------+
|225877|
|  2142|
|446934|
|188718|
|429449|
|  9376|
|393144|
|409910|
|287568|
| 28170|
+------+
only showing top 10 rows



                                                                                

# So now we can present to the visitor a list of the other items a customer previously viewd along with what item the current visitor is viewing e.g. item number 119736

In [9]:
# Removing the current viewing item 
coProductsOnlyDF = coProductsWithCurrentlyViewingDF.filter(
    col("itemId") != customerCurrentlyViewingItem
)
coProductsOnlyDF.show(10, False)

                                                                                

+------+
|itemId|
+------+
|225877|
|2142  |
|446934|
|188718|
|429449|
|9376  |
|393144|
|409910|
|287568|
|28170 |
+------+
only showing top 10 rows



 
# --------------------------------**--------------------------------

# Card Based recommendations:
  
 ## Will be implimented following the techinical flow of the walmart app. Example the number of items recommended, the call to the specific API to build the data displayed in the card, the card formating, etc.
 
# --------------------------------**--------------------------------

# Popular viewed items:

## In future ,We will be suggesting further promotions and offers based on the views received from users on specific products

# --------------------------------**--------------------------------

# Best seller recomendation:

## In future ,We will be suggesting further promotions and offers based on the most selling products as per the interests received from multiple users

# --------------------------------**--------------------------------

# Targetting Ads:
## We can send recommendations based on and in correlation with marketing ads and promotions.

# --------------------------------**--------------------------------

# Special Events:
## We can send recommendations on special events such as holidays or regional festivals.

# --------------------------------End--------------------------------