Feature engineering depends on what type of ML model you will use. For our project, we will use collaborative filtering. In this method, we will predict reccomendations based on the assumption
that the best recommendations come from people who have similar tastes. In other words, it uses historical item ratings of like-minded people to predict how someone would rate an item.


Collaborative filtering predicts the rating or preference of a user for a particular item. This rating can then be used to make recommendations for the user.

For example, if a user has rated several items highly, a collaborative filtering model can predict the user's rating for other items and recommend those with the highest predicted rating. In other words, the model recommends items based on the user's past behavior and the behavior of similar users.

To recommend items to a user, you can use the trained collaborative filtering model to predict the rating of the user for all the items in the dataset. Then, you can recommend the items with the highest predicted ratings to the user.

So, in summary, collaborative filtering is used to predict the rating or preference of a user for a particular item. The predicted ratings can then be used to make recommendations for the user.

In this case, the asin_encoded and reviewerID_encoded columns represent the input features that describe the relationships between users and items in the dataset. The overall column represents the target variable that the model will try to predict, which is the rating that a user would give to a particular item.

We will also drop all other columns.

By training a collaborative filtering model on this data, you can generate recommendations for users based on their past ratings and the ratings of similar users.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

spark=SparkSession \
    .builder \
    .appName("FT") \
    .master("local") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/") \
    .config("spark.driver.memory","12g")\
    .config("spark.executor.memory", "12g") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()


df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", "[{'$sample': {'size': 100000} }]")\
    .option("uri", "mongodb://localhost:27017/"+"amazon"+"."+"data")\
    .option("partitioner", "MongoSinglePartitioner") \
    .option("partitionkey", "asin")\
    .load()

In [2]:
df.show()

+--------------------+----------+-------+--------------------+--------------+---------------+--------------------+--------------+--------+
|                 _id|      asin|overall|          reviewText|    reviewerID|   reviewerName|             summary|unixReviewTime|verified|
+--------------------+----------+-------+--------------------+--------------+---------------+--------------------+--------------+--------+
|{6456654db3ab715e...|B005JOAOLW|    1.0|Horrible. If you ...| A6BJWZ82A94LH|     NAT TURNER|Horrible. If you ...|   1.5268608E9|    true|
|{64566174b3ab715e...|B00AAAH0Y4|    3.0|This watch looks ...|A1FREEHODOHCLK|         Jordan|Looks nice, but i...|   1.3842144E9|    true|
|{645665f4b3ab715e...|B015NFAKLK|    5.0|I would like more...|A3526B1LCK47X9|   1book-addict|             Awe....|   1.4435712E9|    true|
|{64566bccb3ab715e...|1904421148|    3.0|             Average|A281QJ939F5ZOK|  Margot Warner|         Three Stars|   1.4257728E9|    true|
|{645667bcb3ab715e...|B0039

In [2]:
df = df.drop('_id','reviewText','reviewerName','summary','unixReviewTime','verified')
df.show()

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|0674004582|    5.0|A2QC50SA8W5UZX|
|2376190053|    5.0|A2RZU0BOLNAWKE|
|B00IQOC30C|    5.0|A1VRJKI38G4GOV|
|B01C5BKWRM|    5.0|A10H9GVWWYCF2K|
|1534640150|    4.0|A3MM34BGYQ8AGA|
|B004IYSVOQ|    4.0| AD6S505KH3K65|
|B00NMSHJFQ|    5.0| AFVIXQURFEBOG|
|0765320320|    4.0|A21WBZYMCYKAK2|
|1940941717|    5.0|A1VS1MO7XTLY8K|
|1400112834|    5.0|A3OF9QTH843WL3|
|B00K1NPSWG|    5.0|A30UZHV7Z6FIRH|
|B01C2OMSVU|    5.0|A35INP86FVSYTX|
|B0027OWGL8|    5.0|A1KV7MAMHLFMPP|
|B003QX2JMA|    5.0|A2HCJ1J27OHEUU|
|B00IGUJZ5C|    5.0|A118FRGBVZ2NV4|
|B00005I9R3|    5.0|A1RJC0VSOXJV48|
|B000851FM4|    5.0|A2OFG8LSMPI5WM|
|B0058TW286|    5.0|A3RPR41UXZM5E2|
|0820320331|    5.0|A2L548U9O96BGM|
|B003BFR6SK|    5.0|  AJNZWYWMKTJ9|
+----------+-------+--------------+
only showing top 20 rows



In [3]:
from pyspark.ml.feature import StringIndexer

reviwerID_indexer= StringIndexer(inputCol="reviewerID", outputCol="reviewerID_index",handleInvalid='keep')
fin_reviwerID= reviwerID_indexer.fit(df).transform(df)
# fin_reviwerID.show()

In [4]:
#sample_df.printSchema()
asin_indexer= StringIndexer(inputCol="asin", outputCol="asin_index",handleInvalid='keep')
fin_asi= asin_indexer.fit(df).transform(df)
# fin_asi.show()

In [5]:
fin_asi = fin_asi.drop("overall")

In [6]:
from pyspark.sql.functions import col

final_df = (fin_asi
            .join(fin_reviwerID, ['reviewerID'], 'inner')
            .select(col('asin_index'), col('reviewerID_index'), col('overall'))
           )


In [7]:
final_df.show()

+----------+----------------+-------+
|asin_index|reviewerID_index|overall|
+----------+----------------+-------+
|   11896.0|         10379.0|    4.0|
|   86994.0|         98441.0|    5.0|
|   86994.0|         98441.0|    1.0|
|   28799.0|         98441.0|    4.0|
|   86994.0|         98441.0|    5.0|
|   86994.0|         98441.0|    5.0|
|   86994.0|         98441.0|    5.0|
|   86994.0|         98441.0|    5.0|
|   11896.0|         10379.0|    2.0|
|   86994.0|         49422.0|    5.0|
|   86994.0|         49422.0|    5.0|
|   86994.0|         49422.0|    5.0|
|   86994.0|         98441.0|    4.0|
|   86994.0|         98441.0|    4.0|
|   11896.0|         10379.0|    4.0|
|   13084.0|         98441.0|    5.0|
|   86994.0|         36940.0|    5.0|
|   86994.0|           623.0|    5.0|
|   86994.0|           623.0|    5.0|
|   86994.0|         98441.0|    4.0|
+----------+----------------+-------+
only showing top 20 rows



In [8]:
# Drop duplicates based on 'asin_indexed' and 'reviewerID_indexed' columns
final_df = final_df.dropDuplicates(['asin_index', 'reviewerID_index'])

# Drop NaN values
final_df = final_df.na.drop()


In [9]:
final_df.show()

+----------+----------------+-------+
|asin_index|reviewerID_index|overall|
+----------+----------------+-------+
|      76.0|         98441.0|    5.0|
|   84533.0|         95564.0|    5.0|
|   41274.0|         98441.0|    5.0|
|   86994.0|         52520.0|    5.0|
|   86994.0|         68669.0|    5.0|
|    5032.0|         87474.0|    5.0|
|   86994.0|         29858.0|    5.0|
|    4676.0|         98441.0|    5.0|
|   28802.0|         98441.0|    5.0|
|   71500.0|         98441.0|    3.0|
|   86994.0|          1155.0|    4.0|
|    7050.0|         98441.0|    5.0|
|   86994.0|         20959.0|    4.0|
|    9853.0|         98441.0|    4.0|
|   62685.0|         98441.0|    5.0|
|   86994.0|         65726.0|    5.0|
|   86994.0|         57820.0|    5.0|
|   86994.0|         55181.0|    5.0|
|    7230.0|         98441.0|    5.0|
|   16475.0|         98441.0|    4.0|
+----------+----------------+-------+
only showing top 20 rows



In [10]:
#df_index = df_index.drop('asin','reviewerID')
#df_index = df_index.dropna()

In [11]:
#df_index.show()

In [12]:
#from pyspark.sql.functions import count
#final_df.groupBy('overall').agg(count('*').alias('count')).show()


In [13]:
#df_maj=df_index[final_df['overall']==5.0]
#f_min = df_index[final_df['overall']!=5.0]

In [14]:
from pyspark.sql.functions import col

#upsample_df = df_min.sample(True, float(df_maj.count())/float(df_min.count()), seed=50)
#upsample_df = upsample_df.union(df_maj)
#upsample_df.groupBy("overall").count().orderBy(col("count").desc()).show()


In [15]:
#upsample_df.head(5)

In [16]:
#df_index = upsample_df

In [17]:
train_data, test_data = final_df.randomSplit([0.75,0.25])

In [18]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

print("Reg Parm worked successfully")

# Train the ALS model on the training set
als_mf = ALS(maxIter=5, regParam=0.01, userCol="reviewerID_index", itemCol="asin_index", ratingCol="overall", coldStartStrategy="drop")
pipeline_mf = Pipeline(stages=[als_mf])
model_mf = pipeline_mf.fit(train_data)

# Make predictions on the testing set
predictions = model_mf.transform(test_data)
predictions = predictions.dropna()

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Reg Parm worked successfully
Root-mean-square error = 1.6446085733952953


In [19]:
predictions.head(5)

[Row(asin_index=2482.0, reviewerID_index=98441.0, overall=5.0, prediction=2.089376926422119),
 Row(asin_index=3112.0, reviewerID_index=98441.0, overall=4.0, prediction=3.9965786933898926),
 Row(asin_index=22547.0, reviewerID_index=98441.0, overall=5.0, prediction=2.997434139251709),
 Row(asin_index=86994.0, reviewerID_index=6.0, overall=5.0, prediction=4.9922661781311035),
 Row(asin_index=86994.0, reviewerID_index=18.0, overall=5.0, prediction=3.9938125610351562)]

In [28]:
predictions.show()

+----------+----------------+-------+----------+
|asin_index|reviewerID_index|overall|prediction|
+----------+----------------+-------+----------+
|      38.0|         98441.0|    4.0| 4.9957237|
|      93.0|         98441.0|    5.0| 3.7858934|
|    2730.0|         98441.0|    5.0| 4.9957237|
|    3112.0|         98441.0|    5.0| 3.9965787|
|   55337.0|         98441.0|    3.0|0.42313343|
|   86994.0|             1.0|    5.0|  4.997837|
|   86994.0|             4.0|    4.0| 4.9982038|
|   86994.0|            25.0|    2.0| 3.9938126|
|   86994.0|            31.0|    5.0|  4.992266|
|   86994.0|            43.0|    5.0|  4.997806|
|   86994.0|            46.0|    3.0| 3.9938126|
|   86994.0|            70.0|    5.0|  4.992266|
|   86994.0|            82.0|    3.0| 0.7334168|
|   86994.0|           132.0|    5.0|  4.992266|
|   86994.0|           137.0|    4.0|  3.999385|
|   86994.0|           287.0|    5.0|  4.992266|
|   86994.0|           436.0|    4.0|  4.992266|
|   86994.0|        

In [None]:
model.getBlockSize()

4096

In [None]:
df_index = df_index.drop('asin','reviewerID')
df_index.show()

+-------+------------+------------------+
|overall|asin_indexed|reviewerID_indexed|
+-------+------------+------------------+
|    1.0|    596146.0|          910547.0|
|    1.0|    596146.0|          910547.0|
|    4.0|     16719.0|          910547.0|
|    4.0|     16719.0|          910547.0|
|    4.0|     16719.0|          910547.0|
|    4.0|     16719.0|          910547.0|
|    4.0|     16719.0|          910547.0|
|    2.0|    192253.0|          910547.0|
|    3.0|    596146.0|          910547.0|
|    3.0|    596146.0|          910547.0|
|    1.0|    596146.0|          910547.0|
|    1.0|     81688.0|          910547.0|
|    4.0|    431711.0|          910547.0|
|    4.0|    431711.0|          910547.0|
|    4.0|    596146.0|          910547.0|
|    4.0|    596146.0|          910547.0|
|    4.0|    596146.0|          910547.0|
|    4.0|    596146.0|          910547.0|
|    4.0|      5104.0|          910547.0|
|    4.0|      5104.0|          910547.0|
+-------+------------+------------

In [None]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["asin_indexed", "reviewerID_indexed"],
                                   outputCol="VectorFeature")
df_index = featureassembler.transform(df_index)
df_index.show()

+-------+------------+------------------+-------------------+
|overall|asin_indexed|reviewerID_indexed|      VectorFeature|
+-------+------------+------------------+-------------------+
|    4.0|    328933.0|          910547.0|[328933.0,910547.0]|
|    4.0|    328933.0|          910547.0|[328933.0,910547.0]|
|    4.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    4.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    4.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    4.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    4.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    2.0|    105072.0|          910547.0|[105072.0,910547.0]|
|    1.0|    101315.0|          910547.0|[101315.0,910547.0]|
|    1.0|    101315.0|          910547.0|[101315.0,910547.0]|
|    2.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    1.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    3.0|    596146.0|          910547.0|[596146.0,910547.0]|
|    3.0

In [None]:
df = df_index.select("VectorFeature","overall")
df.show()

+-------------------+-------+
|      VectorFeature|overall|
+-------------------+-------+
|[596146.0,910547.0]|    3.0|
|[596146.0,910547.0]|    3.0|
|[203369.0,559904.0]|    1.0|
|[203369.0,559904.0]|    1.0|
|[203369.0,559904.0]|    1.0|
|[203369.0,559904.0]|    1.0|
|[203369.0,559904.0]|    1.0|
|[596146.0,910547.0]|    1.0|
|    [73.0,910547.0]|    4.0|
|    [73.0,910547.0]|    4.0|
| [146325.0,28561.0]|    1.0|
|  [6113.0,304020.0]|    3.0|
|[114288.0,910547.0]|    2.0|
|[114288.0,910547.0]|    2.0|
|  [1787.0,910547.0]|    4.0|
|  [1787.0,910547.0]|    4.0|
|  [1787.0,910547.0]|    4.0|
|  [1787.0,910547.0]|    4.0|
| [69611.0,646704.0]|    4.0|
| [69611.0,646704.0]|    4.0|
+-------------------+-------+
only showing top 20 rows



In [None]:
predictions.show()

+-------+------------+------------------+----------+
|overall|asin_indexed|reviewerID_indexed|prediction|
+-------+------------+------------------+----------+
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    1.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.7734962|
|    2.0|         0.0|          910547.0| 3.77