## Join, Aggregate y Window en profundidad

### Retos

1. Limpieza y preparación
    - Convertir timestamp a tipo fecha.
    - Extraer el año desde el campo title.
    - Validar que movieId es único en movies.
    - Comprobar que todas los ratings hacen referencia a películas existentes en movies.csv
2. Joins
    - Unir ratings y movies por movieId, añadiendo a ratings las columnas de movies: title, views y quality.
3. Aggregations
    - Calcular el rating promedio por cada película
    - Calcular el rating promedio por cada género
    - Calcular el número de votos por cada película
    - Obtener el/los usuario/s que tiene la puntuación más alta y el/los de la más baja
    - Obtener la/s película/s con más de 5 votos que tienen el rating más polarizado (mayor varianza)
4. Función Window
    - Ranking de películas mejor valoradas por género
    - Extraer el Top 2 de cada género y guardar en formato Parquet particionado por género. Tras esto, leer el top para el género 'Children' y 'Comedy'

Entorno

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType

schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Crear la sesión de Spark
spark = SparkSession.builder.appName("PySpark02-challenge").getOrCreate()

movies = spark.read.csv("../../data/movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("../../data/ratings.csv", header=True, inferSchema=True)

### Resultados

Apartado 1

In [None]:
# Transformación de timestamp a fecha
ratings_t = ratings.withColumn("date", f.to_date(f.from_unixtime('timestamp')))

# Extracción de la columna año del título
movies_t = movies.withColumn('year', f.regexp_extract(f.col('title'), r"\((\d{4})\)", 1))

print('Películas repetidas:')
movies.groupBy('movieId').agg(f.count("*").alias('repetitions')).filter("repetitions > 1").show()

print('Valoraciones que no corresponden a ninguna de las películas de movies.csv:')
ratings.join(movies.select("movieId", "title"), on="movieId", how='left_anti').show()

Apartado 2

In [None]:
join_movies_rating = ratings_t.join(movies_t.select("movieId", "title", "genres", "quality"), on="movieId", how='inner')
join_movies_rating.show()

Apartado 3

In [None]:
# Media de valoraciones por película
avg_ratings_movies = join_movies_rating.groupBy('title').agg(f.avg('rating').alias('movie_rating_average'))

# Media de valoraciones por género
normalize_genre = join_movies_rating.withColumn('genre', f.explode(f.split('genres', '\|')))
avg_ratings_genres = normalize_genre.groupBy('genre').agg(f.avg('rating').alias('genre_rating_average'))

# Número de votos de cada película
valorations_movie = join_movies_rating.groupBy('title').agg(f.count('*').alias('num_ratings'))

# Usuarios que han dado la valoración más alta y más baja
max_rating_users = ratings_t.groupBy().agg(f.max('rating').alias('max_rating')).join(ratings_t, f.col('max_rating') == f.col('rating'))
min_rating_users = ratings_t.groupBy().agg(f.min('rating').alias('min_rating')).join(ratings_t, f.col('min_rating') == f.col('rating'))

# Película/s con más de 5 votos que tienen el rating más polarizado
rating_num_and_variance = join_movies_rating.groupBy('title').agg(f.count('rating').alias('num_ratings'), f.variance('rating').alias('rating_variance'))
most_polarized = rating_num_and_variance.filter(f.col('num_ratings') > 5).orderBy(f.col('rating_variance').desc())

Apartado 4

In [None]:
# Calculamos de nuevo la media de votaciones de cada película para elegir las mejores valoradas
movie_rating_avg = join_movies_rating.groupBy('movieId').agg(f.avg('rating').alias('rating_average'))
join_movies_ratingavg = movies_t.join(movie_rating_avg, on='movieId', how='left')

normalize_genre = join_movies_ratingavg.withColumn('genre', f.explode(f.split('genres', '\|')))

# Ventana que indica la columna de partición y el orden requerido para el ranking
window_spec = Window.partitionBy("genre").orderBy(f.col("rating_average").desc())
# Ranking de películas mejor valoradas
ranking = normalize_genre.withColumn("rank", f.row_number().over(window_spec))

# Extraer el top 2 por género del ranking y almacenar en particiones por género
top3 = ranking.select('title', 'views', 'year', 'genre', 'rating_average', 'rank').filter(f.col("rank") < 3)
top3.write.partitionBy("genre").parquet("data/top2_partition_by_genre.parquet")

# Leer los top almacenados para el género 'Children' y 'Comedy'
print('Top 2 películas de Niños:')
spark.read.parquet("./data/top2_partition_by_genre.parquet/genre=Children").show()

print('Top 2 películas de Comedia:')
spark.read.parquet("./data/top2_partition_by_genre.parquet/genre=Comedy").show()

In [126]:
spark.stop()