# Victor Marinas González

# Examen ETL: SPARK 12+1/02

Se podrá utilizar toda la información que se encuentra en el campus. 

Se va a trabajar sobre varios ficheros de datos:

Usuarios: id_usuario::sexo::edad::id_profesion::codigo_postal

Peliculas: id_pelicula::titulo (año)::tipo1|tipo2|tipo3

Ratings: id_pelicula::id_usuario::puntuacion::fecha_timestamp

A cada una de las preguntas hay que responder explicando brevemente que se pretende hacer antes de lanzar el código.

Al documento lo llamareís con vuestro nombre y apellido. Debeís enviarlo a mi correo de CUNEF antes del final del examen.

El lenguaje para trabajar con Spark podrá ser python o R indistintamente.

In [77]:
# Nota: Carga de las puntuaciones
# Función para parsear la fecha

from datetime import datetime
dateparse = lambda x: datetime.fromtimestamp(float(x))

## Primera tarea: Inicializar spark context y cargar los datos desde los ficheros.

In [None]:
# Importamos la librería correspondiente  y cargamos  el Spark context

from pyspark import SparkContext

sc = SparkContext()

In [79]:
# Cargamos las bases de datos :


movies_data = sc.textFile('./movies.dat.txt')
ratings_data = sc.textFile('./ratings.dat.txt')
users_data = sc.textFile('./users.dat.txt')

In [125]:
#Realizamos el parseado eliminando la separacion por  ::

movies_data_split = movies_data.map(lambda x: x.split('::'))
ratings_data_split = ratings_data.map(lambda x: x.split("::")).map(lambda x: [x[0], x[1], int(x[2]), datetime.fromtimestamp(float(x[3])).year])
users_data_split = users_data.map(lambda x: x.split('::'))

In [126]:
movies_data_split.take(1)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]]

In [127]:
ratings_data_split.take(1)

[['1', '1193', 5, 2000]]

In [128]:
users_data_split.take(1)

[['1', 'F', '1', '10', '48067']]

In [129]:
#Parseamos la base de datos de users, utilizando la funcion row, para que quede una fila debajo de la anterior.

from pyspark.sql import Row


users_data_row = users_data.map(lambda x: x.split('::')).\
    map(lambda x: Row(IDUsuario = x[0],
                     Sexo = x[1],
                     Edad = int(x[2]),
                     CodigoPostal = int(x[4]),
                     Id_Profesion = x[3]))
    



In [86]:
users_data_row.take(1)

[Row(CodigoPostal=48067, Edad=1, IDUsuario='1', Id_Profesion='10', Sexo='F')]

In [90]:
# Parseo la base de datos movies,  Primero hacemos un remplazamiento, despues con split separamos por :: (con split)
# Utilizamos la funcion Row, para que ponga cada fila debajo de la anterior.

from pyspark.sql import Row

movies_data_row = movies_data.map(lambda x: x.replace('|', '::')).\
    map(lambda x: x.split('::')).\
    map(lambda x: Row(id_pelicula = x[0],
                     titulo = x[1],
                     año = str(x[1][-6:]),
                     tipo_1 = x[2],
                     tipo_2 = x[3],
                     tipo_3 = x[4]))
    


In [91]:
movies_data_row.take(1)

[Row(año='(1995)', id_pelicula='1', tipo_1='Animation', tipo_2="Children's", tipo_3='Comedy', titulo='Toy Story (1995)')]

In [92]:
# Parseamos  la base de datos (Ratings). Separaramos  por :: (con split)
# Utilizamos Row, por el mismo motivo que en movies.


from pyspark.sql import Row

ratings_data_row = ratings_data.map(lambda x: x.split('::')).\
    map(lambda x: Row(id_pelicula = x[0],
                     id_usuario = x[1],
                     puntuacion = x[2],
                     fecha = dateparse(x[3])))


In [93]:
ratings_data_row.take(1)

[Row(fecha=datetime.datetime(2000, 12, 31, 22, 12, 40), id_pelicula='1', id_usuario='1193', puntuacion='5')]

## Segunda tarea: Media de puntuaciones globales por año. ¿Hay algún año significativamente distinto?

In [None]:
# Primer paso, en la base de datos Ratings, tenemos que filtrar por  la puntuacion y la fecha (año)

In [137]:
ratings_2 = ratings_data_split.map(lambda x: (x[3],x[2]))

ratings_2.take(5)

[(2000, 5), (2000, 3), (2000, 3), (2000, 4), (2001, 5)]

In [138]:
# Sumamos las puntuaciones por años.

ratings_suma = ratings_2.reduceByKey(lambda x, y: x + y)

ratings_suma.take(5)

[(2000, 3248432), (2002, 83171), (2001, 239037), (2003, 11673)]

In [139]:
# Necesitamos contar el numero de personas que votaron, para obtener la media

count_ratings = ratings_2.map(lambda x: (x[0],1))

count_ratings_2 = count_ratings.reduceByKey(lambda x, y: x + y)

count_ratings_2.take(5)


[(2000, 904757), (2002, 24046), (2001, 68058), (2003, 3348)]

In [149]:
# Cogemos solo el año

ratings_3 = ratings_2.map(lambda x: x[0]).distinct().collect()



In [152]:
# Juntamos las dos tablas, que hemos creado anteriormente.

join = ratings_suma.join(count_ratings_2)

join.take(5)


# Obtenemos el año,  la suma de puntuaciones y las nº de personas que hayn votado


[(2000, (3248432, 904757)),
 (2002, (83171, 24046)),
 (2001, (239037, 68058)),
 (2003, (11673, 3348))]

In [156]:
#Calculamos la media de la suma de puntuaciones / nº de personas

mean = join.map(lambda x: (x[0], round(x[1][0]/x[1][1],1)))

mean.take(5)

[(2000, 3.6), (2002, 3.5), (2001, 3.5), (2003, 3.5)]

Podemos observa que la media de puntuaciones por año es:

**Año 2000, media 3.6**

**Año 2001, media 3.5**

**Año 2002, media 3.5**

**Año 2003, media 3.5**

Podemos concluir que la media estos ultimos años es similar.

## Tercera pregunta: ¿Cuál es la película más votada por los mayores de 60? 

In [100]:
users_data_split.take(1)

[['1', 'F', '1', '10', '48067']]

In [101]:
#Cogemos solo la primera y la segunda columna (id y edad)

mayores_60 = users_data_split.map(lambda x: (x[0], int(x[2])))
mayores_60.take(5)

[('1', 1), ('2', 56), ('3', 25), ('4', 45), ('5', 25)]

In [102]:
#Filtramos la segunda columna por mayores de 60

mayores_60_2 = mayores_60.filter(lambda x: x[1] > 60)



In [103]:
mayores_60_2.take(5)

[]

In [104]:
# No hay nadie mayor de 60 años....

## Cuarta pregunta: ¿Cuál es la puntuación media de las peliculas de acción del año 2000?

In [105]:
#Vemos la estructura de la tabla

movies_data_split.take(1)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"]]

In [106]:
# Queremos filtrar  la columna 3, por "acción":

movies_accion = movies_data_split.filter(lambda x: "Action" in x[2])


In [107]:
movies_accion.take(5)

[['6', 'Heat (1995)', 'Action|Crime|Thriller'],
 ['9', 'Sudden Death (1995)', 'Action'],
 ['10', 'GoldenEye (1995)', 'Action|Adventure|Thriller'],
 ['15', 'Cutthroat Island (1995)', 'Action|Adventure|Romance'],
 ['20', 'Money Train (1995)', 'Action']]

In [130]:
# Filtramos la tabla Ratings por el año 2000.

ano_2000 = ratings_data_split.filter(lambda x: x[3] == 2000)

In [131]:
ano_2000.take(5)

[['1', '1193', 5, 2000],
 ['1', '661', 3, 2000],
 ['1', '914', 3, 2000],
 ['1', '3408', 4, 2000],
 ['1', '1197', 3, 2000]]

In [134]:
ano_2000_2 = ano_2000.map(lambda x: (x[0], x[2]))

ano_2000_3 = ano_2000_2.reduceByKey(lambda x, y: x + y)


In [135]:


ano_2000_3.take(5)

[('1440', 456), ('4986', 118), ('2265', 845), ('3076', 84), ('5023', 401)]

## Quinta pregunta: ¿ Cuál es el año en que mayor número de usuarios votaron?

In [136]:
# Utilizaremos el Dataset Ratings para obtener el año y con reducebyKey sumaremos todos los votos por año


In [42]:
año_mayor = ratings_data_split.map(lambda x: (dateparse(x[3]).year, 1)).\
    reduceByKey(lambda a, b: a + b).persist()

In [43]:
año_mayor.collect()

[(2000, 904757), (2002, 24046), (2001, 68058), (2003, 3348)]

**En el año 2000, con 904.757 votos.**

## Sexta pregunta: ¿ Cuál es la película con mejor puntación media?

Tendriamos que seleccionar de  la base de datos,el id de la pelicula y su puntuación, del datasets Ratings

el Segundo paso seria coger de movies, el id de la pelicula (que es el elemento en común) y el titulo

Y juntariamos las dos tablas, y sabriamos la pelicula con mayor puntuación media



In [160]:
ratings_data_split.take(5)

[['1', '1193', 5, 2000],
 ['1', '661', 3, 2000],
 ['1', '914', 3, 2000],
 ['1', '3408', 4, 2000],
 ['1', '2355', 5, 2001]]

In [161]:
movies_data_split.take(5)

[['1', 'Toy Story (1995)', "Animation|Children's|Comedy"],
 ['2', 'Jumanji (1995)', "Adventure|Children's|Fantasy"],
 ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'],
 ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama'],
 ['5', 'Father of the Bride Part II (1995)', 'Comedy']]

In [None]:
# En Ratings seleccionamos el id_pelicula x[0] y la puntuación x[2]

In [162]:
ratings_ej6 = ratings_data_split.map(lambda x: (x[0], x[2]))

In [163]:
#Con el ReduceByKey lo que hacemos es sumar las puntuaciones, de la cada id_pelicula

#. Ejemplo: la pelicula con id 1440, ha recibido en total 456 puntos

ratings_ej6_2 = ratings_ej6.reduceByKey(lambda x,y: x + y)

ratings_ej6_2.take(5)

[('1440', 456), ('4986', 118), ('2265', 845), ('3076', 84), ('5023', 401)]

In [None]:
# Queremos ver cuantas veces se repite el id_pelicula para obtener la media de cada pelicula.

# Lo hicimos en el Ejercicio 4

In [164]:
ratings_ej6_conteo = ratings_data_split.map(lambda x: (x[0], 1))




In [166]:
#Con el ReduceByKey lo que hacemos es contar el numero de  id_pelicula

# Ejemplo la pelicula con id 1440, ha sido votada 136 veces

ratings_ej6_conteo_2 = ratings_ej6_conteo.reduceByKey(lambda x, y: x + y)

ratings_ej6_conteo_2.take(5)

[('1440', 136), ('4986', 27), ('2265', 232), ('3076', 22), ('5023', 120)]

In [None]:
# El siguiente paso será unir las dos bases de datos, para saber la media por pelicula.

# Redondeamos a dos decimales

In [168]:
# Obtenemos a la izq el id_pelicula y a la derecha su puntuacion media

# Ejemplo la pelicula con id 1440, tiene una puntuacion media de 3.35 puntos

final_ej6 = ratings_ej6_2.join(ratings_ej6_conteo_2)

media = final_ej6.map(lambda x: (x[0], round(x[1][0] / x[1][1], 2)))

media.take(5)

[('1440', 3.35),
 ('4986', 4.37),
 ('2265', 3.64),
 ('3076', 3.82),
 ('5023', 3.34)]

In [169]:
# Seleccionamos las dos variables en la tabla movies (id_pelicula y nombre de la pelicula)

movies_ej6 = movies_data_split.map(lambda x: (x[0], x[1]))

movies_ej6.take(5)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)')]

In [170]:
# Juntamos las dos tablas:

peliculas_media = movies_ej6.join(media)

peliculas_media.take(5)

[('1440', ('Amos & Andrew (1993)', 3.35)),
 ('1805', ('Wild Things (1998)', 4.09)),
 ('145', ('Bad Boys (1995)', 3.0)),
 ('1308', ('I Shot a Man in Vegas (1995)', 3.61)),
 ('3726', ('Assault on Precinct 13 (1976)', 3.47))]

In [None]:
# Una vez tenemos el dataset, queremos ver cual es la peliculas más valorada

In [173]:
peliculas_media.map(lambda x: x[1]).max(lambda x: x[1])

('New Jersey Drive (1995)', 4.96)

** La Pelicula más valorada por los espectadores fue New Jersey Drive con un 4.96 sobre 5 **

In [None]:
Extra Practica:

In [174]:
peliculas_media.map(lambda x: x[1]).min(lambda x: x[1])

('Hamlet (2000)', 1.02)

**La pelicula peor valorada fue Hamlet con una puntuacion de 1.02**