# Collaborative Filtering

Notation de films [MovieLens](http://movielens.org)
> Les données sont disponibles ici : <http://grouplens.org/datasets/>.

Nous allons construire un modèle de recommandations

<img src="https://movielens.org/images/site/main-screen.png" height=10% weight=10%>

## 1. Importation algorithme ALS

In [5]:
# import ALS
from pyspark.ml.recommendation import ALS

### 2. Chargement des données
Deux fichiers à importer : fichier des notes et fichiers des films

In [7]:
# Fichier des notations

file_location = "/FileStore/tables/ratings.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(ratings)


userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179
1,1061,3.0,1260759182
1,1129,2.0,1260759185
1,1172,4.0,1260759205
1,1263,2.0,1260759151
1,1287,2.0,1260759187
1,1293,2.0,1260759148
1,1339,3.5,1260759125
1,1343,2.0,1260759131


In [8]:
# Fichier des films

file_location = "/FileStore/tables/movies.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

movies = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(movies)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


## 3. Fusion des données

In [10]:
# Jointure des 2 sources de données

fusion=ratings.join(movies, "movieId")
display(fusion)

movieId,userId,rating,timestamp,title,genres
31,1,2.5,1260759144,Dangerous Minds (1995),Drama
1029,1,3.0,1260759179,Dumbo (1941),Animation|Children|Drama|Musical
1061,1,3.0,1260759182,Sleepers (1996),Thriller
1129,1,2.0,1260759185,Escape from New York (1981),Action|Adventure|Sci-Fi|Thriller
1172,1,4.0,1260759205,Cinema Paradiso (Nuovo cinema Paradiso) (1989),Drama
1263,1,2.0,1260759151,"Deer Hunter, The (1978)",Drama|War
1287,1,2.0,1260759187,Ben-Hur (1959),Action|Adventure|Drama
1293,1,2.0,1260759148,Gandhi (1982),Drama
1339,1,3.5,1260759125,Dracula (Bram Stoker's Dracula) (1992),Fantasy|Horror|Romance|Thriller
1343,1,2.0,1260759131,Cape Fear (1991),Thriller


In [11]:
display(fusion)

movieId,userId,rating,timestamp,title,genres
31,1,2.5,1260759144,Dangerous Minds (1995),Drama
1029,1,3.0,1260759179,Dumbo (1941),Animation|Children|Drama|Musical
1061,1,3.0,1260759182,Sleepers (1996),Thriller
1129,1,2.0,1260759185,Escape from New York (1981),Action|Adventure|Sci-Fi|Thriller
1172,1,4.0,1260759205,Cinema Paradiso (Nuovo cinema Paradiso) (1989),Drama
1263,1,2.0,1260759151,"Deer Hunter, The (1978)",Drama|War
1287,1,2.0,1260759187,Ben-Hur (1959),Action|Adventure|Drama
1293,1,2.0,1260759148,Gandhi (1982),Drama
1339,1,3.5,1260759125,Dracula (Bram Stoker's Dracula) (1992),Fantasy|Horror|Romance|Thriller
1343,1,2.0,1260759131,Cape Fear (1991),Thriller


In [12]:
Si les données sont dans un blog storage :
#ratings = spark.read.csv('wasb://spark@<YOUR_ACCOUNT>.blob.core.windows.net/data/ratings.csv', inferSchema=True, header=True)
#movies = spark.read.csv('wasb://spark@<YOUR_ACCOUNT>.blob.core.windows.net/data/movies.csv', inferSchema=True, header=True)
#ratings.join(movies, "movieId").show()

### 4. Partitionnement des données

In [14]:
data = ratings.select("userId", "movieId", "rating")
splits = data.randomSplit([0.7, 0.3])

train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
train_rows = train.count()
test_rows = test.count()

print ''
print "Nombre de lignes base apprentissage :", train_rows
print "Nombre de lignes base de test :", test_rows

### 5. Construction du modèle

In [16]:
# Pipeline
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="label")

# Estimation du modèle
model = als.fit(train)

### 6. Test du modèle

In [18]:
# On applique sur la base de Test
prediction = model.transform(test)

In [19]:
# Visualisation des données (prediction = résultats du modèle, trueLabel=donnée initiale)
display(prediction.join(movies, "movieId").select("userId", "title", "prediction", "trueLabel"))

userId,title,prediction,trueLabel
380,Guilty as Sin (1993),2.7830827,3.0
242,Guilty as Sin (1993),4.0215945,4.0
311,Guilty as Sin (1993),2.780362,3.0
588,"Hudsucker Proxy, The (1994)",3.737686,3.0
126,"Hudsucker Proxy, The (1994)",4.1601615,5.0
460,"Hudsucker Proxy, The (1994)",3.7650576,5.0
548,"Hudsucker Proxy, The (1994)",3.8361897,4.0
285,"Hudsucker Proxy, The (1994)",3.5373683,5.0
274,"Hudsucker Proxy, The (1994)",3.8789995,5.0
306,"Hudsucker Proxy, The (1994)",3.3523557,3.0
