In [1]:
#import required modules
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext 
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark.ml.recommendation import ALS 
from pyspark.sql.types import FloatType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from pyspark.sql.functions import explode

In [2]:
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

In [3]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("CLIENTES_EXTRACCION.csv",header=True)

In [4]:
df.show(5)

+----------+-----------+-------+------+
|ID_CLIENTE|ID_SSS_TIPO|COMPRAS|BIENES|
+----------+-----------+-------+------+
|         0|         39|      1|     1|
|         0|        133|      1|     1|
|         1|        247|      1|     1|
|         1|        205|      1|     1|
|         2|         46|      2|     6|
+----------+-----------+-------+------+
only showing top 5 rows



In [5]:
datosAgregados = (df.select("ID_CLIENTE","ID_SSS_TIPO","COMPRAS")
                           .where(col("ID_SSS_TIPO").isNotNull())
                           .where(col("ID_CLIENTE").isNotNull()))

In [6]:
datosAgregados.show(900)

+----------+-----------+-------+
|ID_CLIENTE|ID_SSS_TIPO|COMPRAS|
+----------+-----------+-------+
|         0|         39|      1|
|         0|        133|      1|
|         1|        247|      1|
|         1|        205|      1|
|         2|         46|      2|
|         2|        340|      1|
|         2|         82|      1|
|         2|        270|      1|
|         2|         86|      1|
|         3|        298|      2|
|         3|         10|      1|
|         3|         46|      3|
|         3|         53|      1|
|         3|        339|      3|
|         3|        341|      2|
|         3|        351|      2|
|         4|        247|      2|
|         4|        339|     10|
|         4|        341|      5|
|         4|        343|      1|
|         4|         82|      1|
|         4|         46|     13|
|         4|        270|      2|
|         5|        298|      2|
|         5|         82|      1|
|         5|         46|      2|
|         5|        339|      1|
|         

In [7]:
(training, testData) = datosAgregados.randomSplit([0.7, 0.3])

In [8]:
# training the model
#als = ALS(rank=5, maxIter=10, seed=0)
#model = als.fit(X_train.select(["user", "item", "rating"]))
als = ALS(maxIter=10, rank=10, regParam=0.05, alpha =10.0, userCol="ID_CLIENTE", itemCol="ID_SSS_TIPO", ratingCol="COMPRAS",coldStartStrategy="drop",implicitPrefs=True)
model = als.fit(training)

In [9]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(testData)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="COMPRAS",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 3.7593251975735504


In [10]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [11]:
# Generate top 10 user recommendations for each subsubsubtype
itemRecs = model.recommendForAllItems(10)

In [12]:
userRecs.show(5)

+----------+--------------------+
|ID_CLIENTE|     recommendations|
+----------+--------------------+
|      1580|[[339, 0.88958037...|
|      4900|[[335, 0.7699785]...|
|      6620|[[270, 0.855693],...|
|      7240|[[6, 1.0487674], ...|
|      7880|[[39, 0.65885174]...|
+----------+--------------------+
only showing top 5 rows



In [33]:
#pasar a dataframe final por columnas
dataframeFinal = userRecs.select("ID_CLIENTE",explode(col("recommendations")).alias("BienID"))
dataframeFinal = dataframeFinal.select("ID_CLIENTE",col("BienID.*"))
display(dataframeFinal)

DataFrame[ID_CLIENTE: int, ID_SSS_TIPO: int, rating: float]

In [34]:
dataframeFinal.show(3)

+----------+-----------+----------+
|ID_CLIENTE|ID_SSS_TIPO|    rating|
+----------+-----------+----------+
|      1580|         46|0.98126346|
|      1580|        339|0.89072156|
|      1580|        270| 0.7417565|
+----------+-----------+----------+
only showing top 3 rows



In [35]:
dataframeFinal.toPandas().to_csv('SPARK_recomendaciones_usuarios_compras.csv',index=True)

In [13]:
itemRecs.show(5)

+-----------+--------------------+
|ID_SSS_TIPO|     recommendations|
+-----------+--------------------+
|        148|[[45106, 1.056832...|
|        243|[[1765, 0.0404132...|
|         31|[[1765, 1.0222516...|
|        251|[[1765, 0.8921031...|
|         85|[[1765, 0.7585447...|
+-----------+--------------------+
only showing top 5 rows



In [14]:
#pasar a dataframe final por columnas
dataframeFinal_items = itemRecs.select("ID_SSS_TIPO",explode(col("recommendations")).alias("ClienteID"))
dataframeFinal_items = dataframeFinal_items.select("ID_SSS_TIPO",col("ClienteID.*"))
display(dataframeFinal_items)

DataFrame[ID_SSS_TIPO: int, ID_CLIENTE: int, rating: float]

In [15]:
dataframeFinal_items.show(3)

+-----------+----------+---------+
|ID_SSS_TIPO|ID_CLIENTE|   rating|
+-----------+----------+---------+
|        148|     45106|1.0568324|
|        148|     10387|1.0450283|
|        148|     18367|1.0293658|
+-----------+----------+---------+
only showing top 3 rows



In [16]:
dataframeFinal_items.toPandas().to_csv('SPARK_recomendaciones_ssstipos.csv',index=True)