### TC4034.10 Análisis de grandes volúmenes de datos

#### Equipo 13


* Hansel Zapiain Rodríguez (A00469031)
* Miguel Guillermo Galindo Orozco (A01793695)
* Francisco José Arellano Montes (A01794283)

**9.5 Avance de proyecto 4: Sistema de Recomendación**
   
Junio 2024

**Objetivos**


El desarrollo de esta actividad contribuye al cumplimiento de los objetivos del Modulo 5:

Aplicar algoritmos de machine learning a big data enfocado al modelado predictivo y toma de decisiones basada en datos.
Identificar la intersección entre Big Data e Inteligencia Artificial.
Reconocer la aplicación de algoritmos de machine learning en el análisis de Big Data

**Insciones**


onesEn esta entrega es necesario realizar un reporte donde se enlisten los siguientes aspectos:

Implementación final de sistemas de recomendación. Integra la evidencia en GitHub de los algoritmos desarrollados en los avances 4.2 y/o 6.2.
Evaluación integral del desempeño de los modelos utilizando varias métricas. Recuerda integrar la evidencia en el repositorio GitHub del equipo.
Documentación del código base y algoritmos implementados. Entregar en el documento word/pdf en Canvas.po).

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.sparse as sp
import itertools
import matplotlib.pyplot as plt
import os
import sys
import findspark
import implicit
import cudf
import math

from datasets import load_dataset

from pyspark import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, udf, expr, row_number, collect_set
from pyspark.sql.types import FloatType, IntegerType, ArrayType

from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.ml.evaluation import RegressionEvaluator

print("All libraries imported successfully.")

All libraries imported successfully.


In [2]:
dataset = load_dataset('McAuley-Lab/Amazon-Reviews-2023', 'raw_review_All_Beauty', download_mode='force_redownload', trust_remote_code=True)

Downloading builder script:   0%|          | 0.00/39.6k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/19.7k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/327M [00:00<?, ?B/s]

Generating full split:   0%|          | 0/701528 [00:00<?, ? examples/s]

In [3]:
df_amz = dataset['full'].to_pandas()
df_amz.head()

Unnamed: 0,rating,title,text,images,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase
0,5.0,Such a lovely scent but not overpowering.,This spray is really nice. It smells really go...,[],B00YQ6X8EO,B00YQ6X8EO,AGKHLEW2SOWHNMFQIJGBECAF7INQ,1588687728923,0,True
1,4.0,Works great but smells a little weird.,"This product does what I need it to do, I just...",[],B081TJ8YS3,B081TJ8YS3,AGKHLEW2SOWHNMFQIJGBECAF7INQ,1588615855070,1,True
2,5.0,Yes!,"Smells good, feels great!",[],B07PNNCSP9,B097R46CSY,AE74DYR3QUGVPZJ3P7RFWBGIX7XQ,1589665266052,2,True
3,1.0,Synthetic feeling,Felt synthetic,[],B09JS339BZ,B09JS339BZ,AFQLNQNQYFWQZPJQZS6V3NZU4QBQ,1643393630220,0,True
4,5.0,A+,Love it,[],B08BZ63GMJ,B08BZ63GMJ,AFQLNQNQYFWQZPJQZS6V3NZU4QBQ,1609322563534,0,True


In [4]:
gdf = cudf.DataFrame.from_pandas(df_amz)

In [5]:
pdf_result = gdf.to_pandas()

## Pyspark Setup

In [6]:
findspark.init()
findspark.find()

'/opt/conda/envs/rapids/lib/python3.11/site-packages/pyspark'

In [7]:
log4j_properties = '/app/config/log4j.properties'

In [8]:
spark = SparkSession.builder \
    .appName('Amazon Reviews Recommender with GPU') \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
    .config('spark.driver.memory', '32g') \
    .config('spark.executor.memory', '32g') \
    .config('spark.sql.shuffle.partitions', '500') \
    .config('spark.memory.fraction', '0.8') \
    .config('spark.memory.storageFraction', '0.5') \
    .config('spark.master', 'local[*]') \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.driver.extraJavaOptions', f'-Dlog4j.configuration=file:{log4j_properties}') \
    .config('spark.executor.extraJavaOptions', f'-Dlog4j.configuration=file:{log4j_properties}') \
    .config('spark.kryoserializer.buffer.max', '2024m') \
    .config('spark.broadcast.compress', 'true') \
    .config('spark.shuffle.compress', 'true') \
    .config('spark.shuffle.spill.compress', 'true') \
    .config('spark.sql.autoBroadcastJoinThreshold', '20m') \
    .getOrCreate()

print('Spark Session Created:', spark)

Spark Session Created: <pyspark.sql.session.SparkSession object at 0x7fb0accece90>


In [9]:
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")

In [10]:
spark

In [11]:
df_amz_spark = pdf_result.sample(frac = 0.10, replace = False, random_state = 1234)
df_amz_spark.head()

Unnamed: 0,rating,title,text,images,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase
169932,5.0,EZ and great fun!,I brought these to a BBQ at our senior shindig...,[],B07H7C4SJK,B07H7C4SJK,AFWUQ2CT6NCFBTTNI3OCWDB3CRYQ,1561426574699,2,True
117265,5.0,Beautiful!!,I got so many compliments with this hair. Shi...,[],B07T9GXWLT,B07T9GXWLT,AGI5JHRJ2PSK5BURGV762KIG2Y5A,1601530836697,0,True
30312,5.0,Beautiful,Lovely little piece!,[],B073R5WZ4F,B073R5WZ4F,AEDSDRACFPPSWJTYBRFPMHUMHWDQ,1554611883925,0,True
335686,4.0,Not a gluten Free,Lovely product but it contains barley so it ir...,[],B071S98JST,B071S98JST,AEHQ7LKVLDNZOYSMFQFJU6DORK3Q,1681876323469,0,True
101796,3.0,Ehh!,A different quality of hair than what I am use...,[],B077BZCPSK,B077BZCPSK,AHTCO6WZEFV2UURBBZCLOKLSA64A,1562863655282,0,True


In [12]:
spark_df = spark.createDataFrame(df_amz_spark[['user_id', 'asin', 'rating']])

In [13]:
spark_df = spark_df.select(col('user_id').alias('user_id'), col('asin').alias('asin'), col('rating').alias('rating'))

user_indexer = StringIndexer(inputCol = 'user_id', outputCol = 'userIndex')
item_indexer = StringIndexer(inputCol = 'asin', outputCol = 'itemIndex')

indexed_df = user_indexer.fit(spark_df).transform(spark_df)
indexed_df = item_indexer.fit(indexed_df).transform(indexed_df)

                                                                                

In [14]:
indexed_df.select('user_id').show(truncate = False)

+----------------------------+
|user_id                     |
+----------------------------+
|AFWUQ2CT6NCFBTTNI3OCWDB3CRYQ|
|AGI5JHRJ2PSK5BURGV762KIG2Y5A|
|AEDSDRACFPPSWJTYBRFPMHUMHWDQ|
|AEHQ7LKVLDNZOYSMFQFJU6DORK3Q|
|AHTCO6WZEFV2UURBBZCLOKLSA64A|
|AE2L2THU72X2IWZWL2CR3COQVQ3A|
|AF6PUJQU3D2RLAL6OIVLRWPTB75A|
|AEKC3DWVNGFMF2HUGKKBAYXIZQ6A|
|AGASM3YZF2IWIMF5LCLJOJ27DWSA|
|AFYUWHEP53VBBYUJ4JBNRNC7NFUA|
|AHBUU6NLVFYWQMXYHZS7O6EZAAGQ|
|AEHE4FVFEUFFT3Z3G2CKJD2E6FXA|
|AFLCBZENI2BIM356FE6YRQNOYHDQ|
|AFOZH26SDYXHXA2P43KF7SOPLIHA|
|AEFSFOWRUNELLK3BGSXUER7UHLKA|
|AHMONSTMXMAFEGIANY6CNFIYC5EQ|
|AG5MHGXJMURATJMQTLPVQKYP5YCA|
|AEAVFP7USC423CPAHKI4PHXBNC7Q|
|AG2RVJP2CMPYQY6EZ5EGEIO23S4Q|
|AG7AHV77OZWW27JMAZ2K6GKCEL5Q|
+----------------------------+
only showing top 20 rows



In [15]:
indexed_df.count()

70153

In [16]:
indexed_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- userIndex: double (nullable = false)
 |-- itemIndex: double (nullable = false)



## ALS (PySpark)

In [17]:
(training_df, test_df) = indexed_df.randomSplit([0.8, 0.2])
training_df = training_df.repartition(500).cache()
test_df = test_df.repartition(500).cache()

In [18]:
als = ALS(maxIter = 10, regParam = 0.1, userCol = 'userIndex', itemCol = 'itemIndex', ratingCol = 'rating', coldStartStrategy = 'drop', nonnegative = True)
model = als.fit(training_df)

24/06/17 02:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:15 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:16 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/06/17 02:33:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:18 WARN DAGSche

In [19]:
userRecs = model.recommendForAllUsers(10)

In [20]:
recommendations = userRecs.withColumn('rec_exp', F.explode('recommendations')).select('userIndex', F.col('rec_exp.itemIndex').alias('itemIndex'), F.col('rec_exp.rating').alias('predicted_rating'))

In [21]:
user_id_mapping = indexed_df.select('userIndex', 'user_id').distinct()
item_id_mapping = indexed_df.select('itemIndex', 'asin').distinct()

recommendations = recommendations.join(user_id_mapping, on = 'userIndex').join(item_id_mapping, on = 'itemIndex')

In [22]:
user_ids = ['AFWUQ2CT6NCFBTTNI3OCWDB3CRYQ'] 
specific_user_recs = recommendations.filter(recommendations.user_id.isin(user_ids))
specific_user_recs.show(truncate = False)

24/06/17 02:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


+---------+---------+----------------+-------+----+
|itemIndex|userIndex|predicted_rating|user_id|asin|
+---------+---------+----------------+-------+----+
+---------+---------+----------------+-------+----+



24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


In [23]:
def evaluate_rmse(model, test_df):
    predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')
    rmse = evaluator.evaluate(predictions)
    return rmse

In [24]:
def precision_recall_at_k(predictions, k = 10):
    total_users = predictions.select('user_id').distinct().count()
    
    true_positive = predictions.filter(predictions.itemIndex == predictions.trueItemIndex).count()
    
    recommended = predictions.count()
    
    precision = true_positive / recommended
    recall = true_positive / (total_users * k) 
    
    return total_users, precision, recall

In [25]:
def map_at_k(predictions, k=10):
    def apk(actual, predicted, k):
        if len(predicted) > k:
            predicted = predicted[:k]
        score = 0.0
        num_hits = 0.0
        for i, p in enumerate(predicted):
            if p in actual and p not in predicted[:i]:
                num_hits += 1.0
                score += num_hits / (i + 1.0)
        return score / min(len(actual), k)

    pred_items = predictions.groupBy('user_id').agg(F.collect_list('asin').alias('predicted_items'))
    actual_items = test_df.groupBy('user_id').agg(F.collect_list('asin').alias('actual_items'))

    pred_actual = pred_items.join(actual_items, 'user_id')

    mapk = pred_actual.rdd.map(lambda row: apk(row.actual_items, row.predicted_items, k)).mean()
    return mapk

In [26]:
def ndcg_at_k(predictions, k=10):
    def dcg(actual, predicted, k):
        dcg_score = 0.0
        for i, p in enumerate(predicted[:k]):
            if p in actual:
                dcg_score += 1.0 / math.log2(i + 2)
        return dcg_score

    def idcg(actual, k):
        idcg_score = 0.0
        for i in range(min(len(actual), k)):
            idcg_score += 1.0 / math.log2(i + 2)
        return idcg_score

    def ndcg(actual, predicted, k):
        return dcg(actual, predicted, k) / idcg(actual, k)

    pred_items = predictions.groupBy('user_id').agg(F.collect_list('asin').alias('predicted_items'))
    actual_items = test_df.groupBy('user_id').agg(F.collect_list('asin').alias('actual_items'))

    pred_actual = pred_items.join(actual_items, 'user_id')

    ndcgk = pred_actual.rdd.map(lambda row: ndcg(row.actual_items, row.predicted_items, k)).mean()
    return ndcgk

In [27]:
def mrr_at_k(predictions, k=10):
    def reciprocal_rank(actual, predicted):
        for i, p in enumerate(predicted[:k]):
            if p in actual:
                return 1.0 / (i + 1.0)
        return 0.0

    pred_items = predictions.groupBy('user_id').agg(F.collect_list('asin').alias('predicted_items'))
    actual_items = test_df.groupBy('user_id').agg(F.collect_list('asin').alias('actual_items'))

    pred_actual = pred_items.join(actual_items, 'user_id')

    mrrk = pred_actual.rdd.map(lambda row: reciprocal_rank(row.actual_items, row.predicted_items)).mean()
    return mrrk

In [28]:
rmse = evaluate_rmse(model, test_df)
print(f'Root-mean-square error = {rmse}')

24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:23 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:24 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:24 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
                                                                                

Root-mean-square error = 2.885661724451385


24/06/17 02:33:24 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


In [29]:
test_df = test_df.withColumnRenamed('itemIndex', 'trueItemIndex').withColumnRenamed('userIndex', 'trueUserIndex')
recommendations_with_test = recommendations.join(test_df, (recommendations.user_id == test_df.user_id) & (recommendations.itemIndex == test_df.trueItemIndex), 'left').select(recommendations['*'], test_df['trueItemIndex'])

In [30]:
total_users, avg_precision, avg_recall = precision_recall_at_k(recommendations_with_test, k = 10)
mapk = map_at_k(recommendations_with_test, k = 10)
ndcgk = ndcg_at_k(recommendations_with_test, k = 10)
mrrk = mrr_at_k(recommendations_with_test, k = 10)


print(f'Total Users = {total_users}')
print(f'Average Precision = {avg_precision}')
print(f'Average Recall = {avg_recall}')
print(f'MAP at 10: {mapk}')
print(f'NDCG at 10: {ndcgk}')
print(f'MRR at 10: {mrrk}')

24/06/17 02:33:25 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:25 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/06/17 02:33:27 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/06/17 02:33:27 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:28 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:28 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:29 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/06/17 02:33:29 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/06/17 02:33:31 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/06/17 02:33:31 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:32 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/06/17 02:33:32 WARN DAGScheduler: Broadcasting larg

Total Users = 55259
Average Precision = 9.048299824462984e-06
Average Recall = 9.048299824462984e-06
MAP at 10: 0.009095378564405114
NDCG at 10: 0.010506213308686875
MRR at 10: 0.009095378564405114


24/06/17 02:33:47 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
