In [180]:
from pyspark.sql import SparkSession
    
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
from pyspark.sql.functions import struct, collect_list, explode
import json
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType

In [181]:
data_cols = ['userid','movieid','rating','timestamp']
item_cols = ['movieid','movietitle','release date',
'video release date','IMDb URL','unknown','Action',
'Adventure','Animation','Childrens','Comedy','Crime',
'Documentary','Drama','Fantasy','Film-Noir','Horror',
'Musical','Mystery','Romance ','Sci-Fi','Thriller',
'War' ,'Western']
user_cols = ['userid','age','gender','occupation',
'zip code']

In [182]:
users = pd.read_csv('/home/bella/ml-100k/u.user', sep='|',
names=user_cols, encoding='latin-1')

In [183]:
item = pd.read_csv('/home/bella/ml-100k/u.item', sep='|',
names=item_cols, encoding='latin-1')
data = pd.read_csv('/home/bella/ml-100k/u.data', sep='\t',
names=data_cols, encoding='latin-1')

In [185]:
dataset = pd.merge(pd.merge(item, data),users)
df = dataset[['userid','movieid','movietitle','rating','timestamp']]
dataframe =spark.createDataFrame(df)
#ratings = dataframe.rdd


In [186]:
dataframe.printSchema()

root
 |-- userid: long (nullable = true)
 |-- movieid: long (nullable = true)
 |-- movietitle: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [187]:
dataframe.show()

+------+-------+--------------------+------+---------+
|userid|movieid|          movietitle|rating|timestamp|
+------+-------+--------------------+------+---------+
|   308|      1|    Toy Story (1995)|     4|887736532|
|   308|      4|   Get Shorty (1995)|     5|887737890|
|   308|      5|      Copycat (1995)|     4|887739608|
|   308|      7|Twelve Monkeys (1...|     4|887738847|
|   308|      8|         Babe (1995)|     5|887736696|
|   308|      9|Dead Man Walking ...|     4|887737194|
|   308|     11|Seven (Se7en) (1995)|     5|887737837|
|   308|     12|Usual Suspects, T...|     5|887737243|
|   308|     15|Mr. Holland's Opu...|     3|887739426|
|   308|     17|From Dusk Till Da...|     4|887739056|
|   308|     19|Antonia's Line (1...|     3|887737383|
|   308|     21|Muppet Treasure I...|     3|887740729|
|   308|     22|   Braveheart (1995)|     4|887737647|
|   308|     23|  Taxi Driver (1976)|     5|887737293|
|   308|     24|Rumble in the Bro...|     4|887738057|
|   308|  

In [188]:
dataframe.describe().show()

+-------+----------------+-----------------+--------------------+------------------+-----------------+
|summary|          userid|          movieid|          movietitle|            rating|        timestamp|
+-------+----------------+-----------------+--------------------+------------------+-----------------+
|  count|          100000|           100000|              100000|            100000|           100000|
|   mean|       462.48475|        425.53013|                null|           3.52986|8.8352885148862E8|
| stddev|266.614420127509|330.7983563255838|                null|1.1256735991443205|5343856.189502763|
|    min|               1|                1|'Til There Was Yo...|                 1|        874724710|
|    max|             943|             1682|Á köldum klaka (C...|                 5|        893286638|
+-------+----------------+-----------------+--------------------+------------------+-----------------+



In [189]:
training, test = dataframe.randomSplit([0.8,0.2])

In [191]:
als = ALS(maxIter=5, regParam=0.01, userCol='userid', itemCol='movieid', ratingCol='rating')

model = als.fit(training)

predictions = model.transform(test)

In [192]:
predictions.describe().show()

+-------+-----------------+------------------+--------------------+------------------+-------------------+----------+
|summary|           userid|           movieid|          movietitle|            rating|          timestamp|prediction|
+-------+-----------------+------------------+--------------------+------------------+-------------------+----------+
|  count|            20101|             20101|               20101|             20101|              20101|     20101|
|   mean|462.5742500373116|428.08641361126314|                null| 3.515297746380777|8.835338370583055E8|       NaN|
| stddev|266.1794384514314| 330.8215446023915|                null|1.1312069445474744|  5337656.962515463|       NaN|
|    min|                1|                 1|'Til There Was Yo...|                 1|          874724937| -7.307976|
|    max|              943|              1676|Young Poisoner's ...|                 5|          893286638|       NaN|
+-------+-----------------+------------------+----------

In [194]:
predictions = predictions.na.drop()
predictions.describe().show()

+-------+-----------------+------------------+--------------------+-----------------+-------------------+------------------+
|summary|           userid|           movieid|          movietitle|           rating|          timestamp|        prediction|
+-------+-----------------+------------------+--------------------+-----------------+-------------------+------------------+
|  count|            20065|             20065|               20065|            20065|              20065|             20065|
|   mean|462.5799152753551|426.13650635434834|                null|3.517069524046848|8.835299807362074E8|3.4959238094619063|
| stddev|266.2740627633327|  327.849848834244|                null|1.129970935287274|  5337075.121335574|0.9425125063004706|
|    min|                1|                 1|'Til There Was Yo...|                1|          874724937|         -7.307976|
|    max|              943|              1652|Young Poisoner's ...|                5|          893286638|          8.859144|


In [195]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.06994243562


In [196]:
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [197]:
# Generate top 10 movie recommendations for a specified set of users
users = dataframe.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
    # Generate top 10 user recommendations for a specified set of movies
movies = dataframe.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
   
#userRecs.show()
#movieRecs.show()
userSubsetRecs.show()
movieSubSetRecs.show()

+------+--------------------+
|userid|     recommendations|
+------+--------------------+
|    26|[[1643, 5.1165543...|
|   474|[[1368, 6.7601976...|
|    29|[[906, 6.107493],...|
+------+--------------------+

+-------+--------------------+
|movieid|     recommendations|
+-------+--------------------+
|     26|[[180, 7.069045],...|
|    474|[[309, 7.550808],...|
|     29|[[127, 5.9479694]...|
+-------+--------------------+



In [198]:
userSubsetRecs.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [121]:
_df = userSubsetRecs\
.select(explode(userSubsetRecs.recommendations.movieid),'userid')\
#.withColumn("rating",explode(userSubsetRecs.recommendations.movieid))


In [127]:
_df1 = userSubsetRecs\
.withColumn('rating',explode(userSubsetRecs.recommendations.rating))\
.withColumn('movieid',explode(userSubsetRecs.recommendations.movieid))\


In [135]:
rd=_df1.drop('recommendations').collect()

In [146]:
schema = StructType([StructField("userId", StringType(), True), StructField("rating", FloatType(), True), StructField("movieid", StringType(), True)])


In [166]:
dfToSave = spark.createDataFrame(rd, schema)

In [199]:
dataframe.createOrReplaceTempView("m")

In [171]:
dfToSave.createOrReplaceTempView("re")

In [209]:
sql =spark.sql("select  re.userid,m.movietitle  from re join m on re.movieid= m.movieid group by re.userid,m.movietitle order by userid")

In [210]:
sql.show()

+------+--------------------+
|userid|          movietitle|
+------+--------------------+
|    26|  Bitter Moon (1992)|
|    26|        Faust (1994)|
|    26|Slingshot, The (1...|
|    26| Pulp Fiction (1994)|
|    26|Pather Panchali (...|
|    26|When We Were King...|
|    26|Mina Tannenbaum (...|
|    26|World of Apu, The...|
|    26|My Man Godfrey (1...|
|    26|    Boys, Les (1997)|
|    29|        Laura (1944)|
|    29|    Boys, Les (1997)|
|    29|Paradise Lost: Th...|
|    29|       Priest (1994)|
|    29|        Naked (1993)|
|    29|Burnt By the Sun ...|
|    29|Ma vie en rose (M...|
|    29|Umbrellas of Cher...|
|    29|       Caught (1996)|
|    29|    Ninotchka (1939)|
+------+--------------------+
only showing top 20 rows



In [211]:
_delimiter=','

In [212]:
_output='/home/bella/recommendations'

In [214]:
_xy=sql.coalesce(1).write.format('com.databricks.spark.csv').option('header','true').option('delimiter', _delimiter).mode("overwrite").save(_output)