# Examen ETL: SPARK 12+1/02

## Autor: Lucía Saiz Lapique

Se podrá utilizar toda la información que se encuentra en el campus. 

Se va a trabajar sobre varios ficheros de datos:

Usuarios: id_usuario::sexo::edad::id_profesion::codigo_postal

Peliculas: id_pelicula::titulo (año)::tipo1|tipo2|tipo3

Ratings: id_pelicula::id_usuario::puntuacion::fecha_timestamp

A cada una de las preguntas hay que responder explicando brevemente que se pretende hacer antes de lanzar el código.

Al documento lo llamareís con vuestro nombre y apellido. Debeís enviarlo a mi correo de CUNEF antes del final del examen.

El lenguaje para trabajar con Spark podrá ser python o R indistintamente.

In [2]:
# Nota: Carga de las puntuaciones
# Función para parsear la fecha

from datetime import datetime
dateparse = lambda x: datetime.fromtimestamp(float(x))

## Primera tarea: Inicializar spark context y cargar los datos desde los ficheros.

Para el planteamiento y ejecución de los ejercicios, debemos inicializar spark y cargar las tres bases de datos que se nos han proporcionado: movies (lista de películas con su id y género), users (lista de usuarios que puntúan las películas, con su edad, género, profesión y código postal) y ratings (votaciones a cada película con el id de la película y del usuario asociado, además de la fecha en la que se hizo la valoración). 

Hacemos también el parseado de los datos, que están separados por ::, usando la función map y split. 

In [3]:
from pyspark import SparkContext
sc = SparkContext()

In [4]:
data_file = "./movies.dat.txt"
movies = sc.textFile(data_file)

In [5]:
movies.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [6]:
data_file = "./movies.dat.txt"
movies = sc.textFile(data_file)

In [7]:
data_file2 = "./users.dat.txt"
users = sc.textFile(data_file2)

In [8]:
data_file3 = "./ratings.dat.txt"
ratings = sc.textFile(data_file3)

In [9]:
ratings.take(1)

['1::1193::5::978300760']

In [10]:
movies2 = movies.map(lambda x: x.split("::"))
users2 = users.map(lambda x: x.split("::"))
ratings2 = ratings.map(lambda x: x.split("::")).map(lambda x: [x[0], x[1], int(x[2]), datetime.fromtimestamp(float(x[3])).year])
movies2.take(1), users2.take(1), ratings2.take(1)

([['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]],
 [['1', 'F', '1', '10', '48067']],
 [['1', '1193', 5, 2000]])

## Segunda tarea: Media de puntuaciones globales por año. ¿Hay algún año significativamente distinto?

Para sacar la media de puntuaciones globales por año, debemos seguir una serie de pasos: 
1. Con un map, reducimos la base de datos ratings para usar solo los que nos interesan en este caso, que son el año y la valoración de la película en cuestión. Es importante poner el año en la primera posición (índice 0) para que al usar reduceByKey en los siguientes pasos, use el año como referencia para agrupar. 
2. Lo siguiente es agrupar todos los ratings por año, sumándolos con un reduceByKey y así calcular la media de los rating por año más adelante. Con la función distinct cogemos todos los años distintos (únicos) que hay y sumamos todos los ratings asociados a esos años. 
3. A continuación, queremos averiguar el número de ratings total que hay cada año, cifra sobre la que dividiremos la suma de los ratings calculada en el paso anterior y así obtendremos la media. Para crear el contador, con la función map, le asociamos un 1 a cada vez que se repite un año específico en la base de datos que, con un reduceByKey, nos devolverá el número de veces que se ha hecho un rating en los 4 años posibles. 
4. Finalmente, uniremos las dos bases de datos que hemos creado en el paso dos y en el paso tres con un join y, una vez juntas, hacemos la media dividiendo los valores de la base de datos del paso dos entre los del paso tres (aplicamos un round para que nos redondee a dos decimales los valores). 

In [11]:
## paso 1
rating_yr = ratings2.map(lambda x: (x[3], x[2]))
rating_yr.take(5)

[(2000, 5), (2000, 3), (2000, 3), (2000, 4), (2001, 5)]

In [12]:
## paso 2
rating_yr.map(lambda x: x[0]).distinct().collect()

[2000, 2001, 2002, 2003]

In [13]:
suma_ratings = rating_yr.reduceByKey(lambda x, y: x + y)
suma_ratings.take(4)

[(2000, 3248432), (2001, 239037), (2002, 83171), (2003, 11673)]

In [14]:
## paso 3
cantidad = rating_yr.map(lambda x: (x[0], 1))
cantidad_key = cantidad.reduceByKey(lambda x, y: x + y)
cantidad_key.take(4)

[(2000, 904757), (2001, 68058), (2002, 24046), (2003, 3348)]

In [15]:
## paso 4
juntos = suma_ratings.join(cantidad_key)
juntos.take(5)

[(2000, (3248432, 904757)),
 (2001, (239037, 68058)),
 (2002, (83171, 24046)),
 (2003, (11673, 3348))]

In [16]:
media = juntos.map(lambda x: (x[0], round(x[1][0]/x[1][1], 2)))
media.take(4)

[(2000, 3.59), (2001, 3.51), (2002, 3.46), (2003, 3.49)]

No hay ningún año que sea significativamente distinto

## Tercera pregunta: ¿Cuál es la película más votada por los mayores de 60? 

No se puede hacer porque no hay observaciones de usuarios que sean mayores de 60 años. Lo vemos sacando el valor de la edad máxima en la base de datos.

In [17]:
users2.take(1)

[['1', 'F', '1', '10', '48067']]

In [18]:
users_edad = users2.map(lambda x: (x[0], int(x[2])))
users_edad.take(5)

[('1', 1), ('2', 56), ('3', 25), ('4', 45), ('5', 25)]

In [19]:
mayores_60 = users_edad.filter(lambda x: x[1] > 60)
mayores_60.take(5)

[]

In [20]:
users_edad.max(lambda x: x[1])

('2', 56)

## Cuarta pregunta: ¿Cuál es la puntuación media de las peliculas de acción del año 2000?

En este caso tenemos que usar dos de las bases de datos (movies y ratings) ya que tenemos que combinar los datos del tipo de género de las películas y los ratings con la fecha en la que se hicieron. 
1. El primer paso es hacer una nueva base de datos basada en movies que incluya solo las películas que sean de tipo acción. Para ello aplicamos un filtro con la función in.
2. En segundo lugar, filtramos la base de datos ratings para tener solo las valoraciones de película que se hicieron en el año 2000. 
3. Queremos calcular la media de puntuación de cada película de ese año: con un reduceByKey, sumamos para ese año todos los ratings que haya por cada id de película, ya que tenemos que cacular la media de valoración de cada película. De forma parecida al ejercicio anterior, con esa cifra y el número de ratings que haya por película (que averiguamos con un conteo), sacamos la media de estas. 
4. Antes de ejecutar la media, guardamos los datos en una misma base de datos a la que le agregaremos también las películas que sean del género "Acción" con un join a la base de datos que hemos filtrado en el primer paso, donde tendremos en cuenta solo el id de la película y las cifras que hemos calculado en el paso 3.
5. No nos interesa el valor medio del rating de cada película, sino la media total de la suma de todos los ratings de todas las películas en ese año; para ello, necesitamos contar cuantas películas hay y cuántos ratings tiene cada una de esas películas, así que creamos un nuevo contador, que llamamos "Común". Con común vamos a sumar la cifra de las sumas de los ratings por película por un lado, y por otro lado sumaremos la cantidad de ratings que hay por peícula. 
6. Finalmente, al juntar estas dos de nuevo con un join, ejecutamos la media.


In [22]:
movies2.take(1)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]]

In [23]:
## paso 1
pelis_accion = movies2.filter(lambda x: 'Action' in x[2])
pelis_accion.take(5)

[['6', 'Heat (1995)', 'Action|Crime|Thriller'],
 ['9', 'Sudden Death (1995)', 'Action'],
 ['10', 'GoldenEye (1995)', 'Action|Adventure|Thriller'],
 ['15', 'Cutthroat Island (1995)', 'Action|Adventure|Romance'],
 ['20', 'Money Train (1995)', 'Action']]

In [49]:
## paso 2
ratings_2000 = ratings2.filter(lambda x: x[3] == 2000)
ratings_2000.take(5)

[['1', '1193', 5, 2000],
 ['1', '661', 3, 2000],
 ['1', '914', 3, 2000],
 ['1', '3408', 4, 2000],
 ['1', '1197', 3, 2000]]

In [52]:
## paso 3
ratings_2000_red = ratings_2000.map(lambda x: (x[0], x[2]))
ratings_2000_key = ratings_2000_red.reduceByKey(lambda x, y: x + y)
ratings_2000_key.take(5)

[('4334', 564), ('4986', 118), ('2265', 845), ('5964', 2075), ('3603', 920)]

In [54]:
numero_ratings = ratings_2000.map(lambda x: (x[0], 1))
numero_rating_key = numero_ratings.reduceByKey(lambda x, y: x + y)
numero_rating_key.take(5)

[('4334', 157), ('4986', 27), ('2265', 232), ('5964', 549), ('3603', 328)]

In [56]:
total = ratings_2000_key.join(numero_rating_key)
total.take(5)

[('4334', (564, 157)),
 ('4986', (118, 27)),
 ('2265', (845, 232)),
 ('5964', (2075, 549)),
 ('3603', (920, 328))]

In [59]:
## paso 4
pelis_accion_2000 = total.join(pelis_accion)
pelis_accion_2000.take(5)

[('145', ((117, 39), 'Bad Boys (1995)')),
 ('1129', ((427, 103), 'Escape from New York (1981)')),
 ('780', ((1770, 512), 'Independence Day (ID4) (1996)')),
 ('3624', ((1301, 323), 'Shanghai Noon (2000)')),
 ('2019',
  ((106, 30),
   'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)'))]

In [61]:
pelis_final = pelis_accion_2000.map(lambda x: (x[0], x[1][0]))
pelis_final.take(5)

[('145', (117, 39)),
 ('1129', (427, 103)),
 ('780', (1770, 512)),
 ('3624', (1301, 323)),
 ('2019', (106, 30))]

In [62]:
## paso 5
pelis_final_ratings = pelis_final.map(lambda x: ("Comun", x[1][0]))
pelis_final_ratings.take(5)

[('Comun', 117),
 ('Comun', 427),
 ('Comun', 1770),
 ('Comun', 1301),
 ('Comun', 106)]

In [64]:
pelis_final_ratings_key = pelis_final_ratings.reduceByKey(lambda x, y: x + y)
pelis_final_ratings_key.take(1)

[('Comun', 257798)]

In [65]:
pelis_final_count = pelis_final.map(lambda x: ("Comun", x[1][1]))
pelis_final_count_key = pelis_final_count.reduceByKey(lambda x, y: x + y)
pelis_final_count_key.take(1)

[('Comun', 72835)]

In [68]:
## paso 6
media2 = pelis_final_ratings_key.join(pelis_final_count_key)
media2.take(1)

[('Comun', (257798, 72835))]

In [71]:
media_2000_accion = media2.map(lambda x: round(x[1][0]/x[1][1],2))
media_2000_accion.take(1)

[3.54]

## Quinta pregunta: ¿Cuál es el año en que mayor número de usuarios votaron?

Lo más importante en este ejercicio es coger todos los valors únicos de la fecha y el índice del usuario; muchos usarios hacen varias valoraciones en el mismo año, con lo cual no podemos coger todas las valoraciones que se hayan hecho en cada año. Debemos seleccionar, de la base de datos ratings, todos los usuarios diferentes que hay en cada año y guardarlos en un objeto. Es importante guardar el año en la primera posición (índice 0) para que el reduceByKey agrupe por año. Con ese objeto creado, hacemos un reduceByKey que haga de contador de todos esos valores únicos para saber cuántos usuarios hay en cada año. Por último, para averiguar qué año hay más, usamos la función max sobre el índice 1, que es donde tenemos guardado el número de usuarios que puntúan las películas anualmente. 

In [72]:
ratings2.take(1)

[['1', '1193', 5, 2000]]

In [101]:
rating_user = ratings2.map(lambda x: (x[3], x[1])).distinct()
rating_user.take(5)

[(2002, '2965'), (2000, '3469'), (2000, '320'), (2002, '2758'), (2000, '2020')]

In [102]:
rating_user_map = rating_user.map(lambda x: (x[0], 1))
rating_user_key = rating_user_map.reduceByKey(lambda x, y: x + y)

In [103]:
rating_user_key.max(lambda x: x[1])

(2000, 3678)

## Sexta pregunta: ¿ Cuál es la película con mejor puntación media?

En este ejercicio debemos caulcular la media de puntuaciones a lo largo de todos los años que incluye la base de datos específica para cada película. Seguimos los siguientes pasos:
1. Como en los ejercicios anteriores, con la función reduceByKey, creando un contador y con un join, sacamos la media de puntuaciones asociado a cada id de película en la base de datos ratings.
2. Una vez tenemos la media de cada película asociado a su id, unimos nuestro filtro a la base de datos movies para asociarle la media a su nombre de película respectivo. 
3. Finalmente, con la función max, obtenemos el nombre de la película con la mayor media de puntuaciones a lo largo de los años. Como ya no nos interesa el valor del id de la película, con un map, lo excluimos del código respuesta al max. 

In [82]:
movies2.take(1)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]]

In [84]:
ratings2.take(1)

[['1', '1193', 5, 2000]]

In [105]:
## paso 1
ratings_6_suma = ratings2.map(lambda x: (x[0], x[2]))
ratings_6_suma_key = ratings_6_suma.reduceByKey(lambda x, y: x + y)
ratings_6_suma_key.take(5)

[('4334', 564), ('4986', 118), ('2265', 845), ('5964', 2075), ('3603', 920)]

In [107]:
ratings_6_count = ratings2.map(lambda x: (x[0], 1))
ratings_6_count_key = ratings_6_count.reduceByKey(lambda x, y: x + y)
ratings_6_count_key.take(5)

[('4334', 157), ('4986', 27), ('2265', 232), ('5964', 549), ('3603', 328)]

In [110]:
juntitos = ratings_6_suma_key.join(ratings_6_count_key)
media = juntitos.map(lambda x: (x[0], round(x[1][0]/x[1][1], 2)))
media.take(5)

[('4334', 3.59), ('4986', 4.37), ('2265', 3.64), ('5964', 3.78), ('3603', 2.8)]

In [111]:
## paso 2
movies_red = movies2.map(lambda x: (x[0], x[1]))
media_pelis = movies_red.join(media)
media_pelis.take(5)                         

[('1440', ('Amos & Andrew (1993)', 3.35)),
 ('2265', ('Nothing But Trouble (1991)', 3.64)),
 ('3076', ('Irma la Douce (1963)', 3.82)),
 ('2723', ('Mystery Men (1999)', 3.43)),
 ('145', ('Bad Boys (1995)', 3.0))]

In [116]:
## paso 3
media_pelis.map(lambda x: x[1]).max(lambda x: x[1])

('New Jersey Drive (1995)', 4.96)

In [24]:
sc.stop()