**Paso 1: Importar las librerias necesarias y configurar Spark**

In [1]:
!pip install pyspark
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("Film Recommender").getOrCreate()


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=9fb3255d8dd0f1c162b8960d0fa26467c3e3fea98b15a66e1c5494998fe44fce
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


**Paso 2: Datos de Movie Lens**
Los datos son de Movie Lens, obtenido de https://grouplens.org/datasets/movielens/latest/ El conjunto de datos cuenta con 27.000.000 valoraciones y 1.100.000 aplicaciones de etiquetas aplicadas a 58.000 películas por 280.000 usuarios. Incluye datos del genoma de etiquetas con 14 millones de puntuaciones de relevancia en 1.100 etiquetas.

Los datos ya se encuentran en un csv sin embargo estos se encuentran en diferentes archivos, se juntará el de películas y el de ratings según usuario.

In [2]:
import pandas as pd

In [41]:
#Reading users file:
users= pd.read_csv('ratings.csv', sep=',')
# Checking shape of users files and head
print(users.shape)
users.head()

(5453595, 4)


Unnamed: 0,userId,movieId,rating,timestamp
0,1,307,3.5,1256677221
1,1,481,3.5,1256677456
2,1,1091,1.5,1256677471
3,1,1257,4.5,1256677460
4,1,1449,4.5,1256677264


In [4]:
# Reading the movie data
movies= pd.read_csv('movies.csv')
# Checking shape of movie data and look first 5 rows
print(movies.shape)
movies.head()

(58098, 3)


Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [21]:
# Combining the data on same column
df= pd.merge(users, movies, on= 'movieId')
del df['timestamp']
df.head()

Unnamed: 0,userId,movieId,rating,title,genres
0,1,307,3.5,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama
1,6,307,4.0,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama
2,56,307,4.0,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama
3,71,307,5.0,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama
4,84,307,3.0,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama


In [22]:
df.to_csv('MovieUserRating.csv')

In [14]:
# Promedio de Ratings
f = {'movieId': 'first', 'genres': 'first'}
ratings = pd.DataFrame(df.groupby('title')['rating'].mean())
ratings.head()

# Conteo de Ratings
ratings['rating_counts']= pd.DataFrame(df.groupby('title')['rating'].count())
ratings.head()

Unnamed: 0_level_0,rating,rating_counts
title,Unnamed: 1_level_1,Unnamed: 2_level_1
'71 (2014),3.0,1
'Hellboy': The Seeds of Creation (2004),3.0,1
'Round Midnight (1986),4.5,1
'Salem's Lot (2004),3.5,1
'Til There Was You (1997),2.5,2


In [15]:
mov= pd.merge(ratings, movies, on= 'title')
mov.head()

Unnamed: 0,title,rating,rating_counts,movieId,genres
0,'71 (2014),3.0,1,117867,Action|Drama|Thriller|War
1,'Hellboy': The Seeds of Creation (2004),3.0,1,97757,Action|Adventure|Comedy|Documentary|Fantasy
2,'Round Midnight (1986),4.5,1,26564,Drama|Musical
3,'Salem's Lot (2004),3.5,1,27751,Drama|Horror|Mystery|Thriller
4,'Til There Was You (1997),2.5,2,779,Drama|Romance


**Paso 3: Leer contenido del archivo CSV y transforma a un DataFrame**


In [29]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("CSV Reader").getOrCreate()

# Set the options for reading the CSV file
csv_options = {
    "header": "true",
    "encoding": "utf-8",
    "sep": ","
}

# Load the MovieLens dataset
data_path = "MovieUserRating.csv"
df = spark.read.format("csv").options(**csv_options).load(data_path)

**Paso 7: Mostrar el contenido del DataFrame obtenido del paso anterior**

In [30]:
df.head()

Row(_c0='0', userId='1', movieId='307', rating='3.5', title='Three Colors: Blue (Trois couleurs: Bleu) (1993)', genres='Drama')

**Paso 8: Pre-procesamiento del conjunto de datos**

In [31]:
# Select relevant columns and rename them
df = df.select("userId", "movieId", "rating").withColumnRenamed("userId", "user").withColumnRenamed("movieId", "movie")

# Convert rating column to float
df = df.withColumn("rating", df["rating"].cast("float"))

# Filter out any invalid or missing values
df = df.filter(df["user"].isNotNull() & df["rating"].isNotNull() & df["movie"].isNotNull())


**Paso 9: Dividir le conjunto de datos en dos subconjuntos - Train y Test**

In [32]:
# Split the data into training and testing sets (80% for training, 20% for testing)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


**Paso 10: Indexazacion de valores no numericos (user, song) y entrenamiento del modelo**

In [34]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

# Create StringIndexers for the user and song columns
user_indexer = StringIndexer(inputCol="user", outputCol="userIndex")
movie_indexer = StringIndexer(inputCol="movie", outputCol="movieIndex")

# Fit StringIndexers and transform the data
indexed_data = user_indexer.fit(train_data).transform(train_data)
indexed_data = movie_indexer.fit(indexed_data).transform(indexed_data)

# Create an ALS recommender model
als = ALS(userCol="userIndex", itemCol="movieIndex", ratingCol="rating", nonnegative=True)

# Fit the model to the training data
model = als.fit(indexed_data)



**Paso 11: Indexazacion de valores no numericos (user, song) y generacion del conjunto de prueba**

In [35]:
user_indexer = StringIndexer(inputCol="user", outputCol="userIndex")
movie_indexer = StringIndexer(inputCol="movie", outputCol="movieIndex")

# Fit StringIndexers and transform the data
indexed_test_data = user_indexer.fit(test_data).transform(test_data)
indexed_test_data = movie_indexer.fit(indexed_test_data).transform(indexed_test_data)


**Paso 12: Genera 5 recomendaciones para cada usuario en el conjunto de datos de prueba**

In [36]:
# Generate top 5 recommendations for each user in the test data
recommendations = model.recommendForUserSubset(indexed_test_data, 5)

**Paso 13: Mostrar recomendaciones**

In [37]:
# Show the recommendations
recommendations.show(truncate=False)


+---------+-----------------------------------------------------------------------------------------------+
|userIndex|recommendations                                                                                |
+---------+-----------------------------------------------------------------------------------------------+
|0        |[{5968, 5.843327}, {8565, 5.5709853}, {6562, 5.5074363}, {9061, 5.3523984}, {3303, 5.3498135}] |
|1        |[{5968, 4.5784}, {6562, 4.456378}, {8565, 4.3860097}, {7837, 4.0341554}, {3303, 4.031731}]     |
|2        |[{6562, 4.968369}, {5677, 4.8559165}, {6395, 4.854005}, {8565, 4.796129}, {5968, 4.62594}]     |
|3        |[{8565, 4.7827754}, {6562, 4.511468}, {5677, 4.4252925}, {5666, 4.414826}, {9061, 4.411838}]   |
|4        |[{6562, 4.867956}, {5968, 4.831729}, {8167, 4.7461157}, {8565, 4.729965}, {5877, 4.726344}]    |
|5        |[{6918, 5.546693}, {6917, 5.546693}, {4135, 5.014857}, {5677, 4.9348044}, {4387, 4.9245086}]   |
|6        |[{9061, 5.8312006

In [40]:
from pyspark.sql.functions import col

id_to_retrieve = 1

filtered_data = indexed_data.filter(col("movieIndex") == id_to_retrieve)

movie_index = filtered_data.select("movie").collect()[0][0]

print(movie_index)

df_original = spark.read.format("csv").options(**csv_options).load(data_path)

artist_name = (df_original.filter(col("movieId") == movie_index)).select("title").collect()[0][0]

print(artist_name)

genre_name = (df_original.filter(col("movieId") == movie_index)).select("genres").collect()[0][0]

print(genre_name)

296
Pulp Fiction (1994)
Comedy|Crime|Drama|Thriller
