In [None]:
# Global data variables
SANDBOX_NAME = # Sandbox Name
DATA_PATH = "/data/sandboxes/" + SANDBOX_NAME + "/data/data/" 



# Combinando DataFrames

En _pyspark_ hay dos formas de combinar los datos de DataFrames.
* por filas: `join`
* por columnas: `union`

In [None]:
movies_df = spark.read.csv(DATA_PATH + 'movie-ratings/movies.csv', sep=',', header=True, inferSchema=True)
ratings_df = spark.read.csv(DATA_PATH + 'movie-ratings/ratings.csv', sep=',', header=True, inferSchema=True)

In [None]:
movies_df.show(5)

In [None]:
ratings_df.show(5)



## join

Añadamos a cada rating el título y el género de la pelicula.

In [None]:
ratings_movies_df = ratings_df.join(movies_df, on='movieId', how='inner')

In [None]:
ratings_movies_df.show(5)

 

Si la columna de unión tuviera distinto nombre en ambos DataFrames

In [None]:
movies2_df = movies_df.withColumnRenamed('movieId', 'id_movie')
movies2_df.show(2)

In [None]:
ratings_movies2_df = ratings_df.join(movies2_df, 
                                     on=[ratings_df['movieId'] == movies2_df['id_movie']], how='outer')

In [None]:
ratings_movies2_df.show(5)



## union

Imagina que tuvieramos un DataFrame con las películas de terror y otro con las de comedia y quisieramos unirlo todo en uno.

In [None]:
from pyspark.sql import functions as F



Crea un dataframe que contenga solamente aquellas películas que sean del género `Horror` y otro que contenga aquellas del género `Comedy`.

In [None]:
horror_df = movies_df.filter(F.col('genres') == 'Horror')
comedy_df = movies_df.filter(F.col('genres') == 'Comedy')

In [None]:
horror_df.select('genres').distinct().show()

In [None]:
comedy_df.select('genres').distinct().show()



Mediante `union()` se pueden unir ambos dataframes. Comprueba que en el nuevo dataframe generado existen ambos géneros.

In [None]:
horror_comedy_df = horror_df.union(comedy_df)

In [None]:
horror_comedy_df.select('genres').distinct().show()



# Persistiendo DataFrames

Debido al concepto de *lazy_evaluation* de Spark cada vez que realicemos una acción sobre el DataFrame `ratings_movies_df` se ejecutara la operación de _join_. Este tipo de operación es muy costosa computacionalmente por lo que es recomedable realizar un `cache` o `persist` sobre el DataFrame para evitar ejecutarla multiples veces.

Al persistir un DataFrame se guarda temporalmente el resultado del DAG hasta el punto donde se cachea el DataFrame, evitando que se ejecute esa parte repetidas veces con cada acción.



Por ejemplo, si queremos contar el número de títulos únicos con rating de 5 y también con rating de 1:

__ineficiente:__

In [None]:
%%time

ratings_movies_df.filter(F.col('rating') == 5).select('title').distinct().count()

In [None]:
%%time

ratings_movies_df.filter(F.col('rating') == 1).select('title').distinct().count()



__eficiente__

Observa que la primera acción puede ser incluso más lenta que la anterior ya que se está guardando el resultado. En cambio la segunda acción es mucho más rápida al no necesitar volver a ejecutar el join.

In [None]:
ratings_movies_df.persist()

In [None]:
%%time

ratings_movies_df.filter(F.col('rating') == 5).select('title').distinct().count()

In [None]:
%%time

ratings_movies_df.filter(F.col('rating') == 1).select('title').distinct().count()



Es importante borrar los DataFrames cacheados cuando no se vuelven a utilizar.

In [None]:
ratings_movies_df.unpersist()



Es posible elegir si el guardado temporal se hace en memoria, en disco, o en ambas.

In [None]:
from pyspark import StorageLevel

In [None]:
ratings_movies_df.persist(storageLevel=StorageLevel.DISK_ONLY)

In [None]:
ratings_df.unpersist()

In [None]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

In [None]:
ratings_df.unpersist()

In [None]:
ratings_movies_df.persist(storageLevel=StorageLevel.MEMORY_ONLY)

In [None]:
ratings_df.unpersist()

In [None]:
spark.catalog.clearCache()

 

**Resumen de formas de persistencia**

_Si queremos persistir un dataframe en memoria:_ `df.cache()`

_Si queremos persistir un dataframe en memoria, en disco o en mixto:_ `df.persist(storageLevel=StorageLevel.<MEMORY_ONLY/DISK_ONLY/MEMORY_AND_DISK>)`

_Si queremos borrar todos los dataframes que hemos cacheado con `cache()`:_ `spark.catalog.clearCache()`

_Si queremos despersistir un dataframe que hemos persistido con `persist()`:_ `df.unpersist()`


# Ejercicio 1



Usando los siguientes DataFrames:

In [None]:
movies_df = spark.read.csv(DATA_PATH + 'movie-ratings/movies.csv', sep=',', header=True, inferSchema=True)
ratings_df = spark.read.csv(DATA_PATH + 'movie-ratings/ratings.csv', sep=',', header=True, inferSchema=True)



1) Crea un nuevo DataFrame que calcule el rating medio, máximo, y mínimo de cada película.

2) Filtra `movies_df` por las peliculas que contengan la palabra 'the' en el título.

3) Filtra `movies_df` por aquellas peliculas con al menos un `score` de 2 y una media de 4 o más. Usa un `left-semi` join.

**Nota**: Los métodos *left-semi* y *left-anti* joins son operaciones más rápidas que los joins normales puesto que solo requieren reordenar la tabla de la derecha por clave primaria.

4) Guarda en una lista el título de todas las peliculas que contiene el DataFrame obtenido en el punto anterior.

In [None]:
# Respuesta

from pyspark.sql import functions as F
from pyspark.sql.functions import col

agg_ratings = ratings_df.groupBy('movieId').agg(F.max(col('rating')).alias('max_rating'),
                                  F.min(col('rating')).alias('min_rating'),
                                  F.round(F.avg(col('rating')), 2).alias('avg_rating'))
agg_ratings.show(3)

In [None]:
# Respuesta

movies_df = movies_df.filter(col('title').rlike('the '))

# If we want to create a perfect filter, we shall meake use of regex (which is not part of the course)
movies_df.show(20, truncate=False)

In [None]:
# Respuesta

rating_filter_df = agg_ratings.filter(col('min_rating') >= 2).filter(col('avg_rating') >= 4)
rating_filter_df.show(3)

In [None]:
# Respuesta

print('movies_df number of rows: {}'.format(movies_df.count()))
movies_filtered = movies_df.join(rating_filter_df, on=['movieId'], how='left_semi')
print('movies_filtered number of rows: {}'.format(movies_filtered.count()))

In [None]:
# Respuesta
movies_filtered.show(3)

In [None]:
# Respuesta
titles = [row[0] for row in movies_filtered.select('title').collect()]
print(len(titles))
titles[:10]