## Title

# Data Merge

### Description:

In this notebook we will have a first look to the 4 initial dataset and concat them in order to work with the full dataset.

### Authors:

#### Hugo Cesar Octavio del Sueldo¶
#### Jose Lopez Galdon

### Date:
04/12/2020
### Version:¶
1.0

## Lets show how to perform an ALS algorithm with our MovieLens

First of all, we will create the sparkContext and we will create the RDD from our files downloaded from the official website

In [2]:
    #Findspark to locate the spark in the system
#import findspark
#findspark.init()
    # Initialize the spark context
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
    # Due to we are going to work with sparkSQL we will introduce the sparksql context
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
datos_movies = "../data/01_raw/movies.csv"
datos_ratings = "../data/01_raw/ratings.csv"
datos_user = "../data/01_raw/tags.csv"
datos_user_ratings = "../data/01_raw/links.csv"

### Movies dataset

In [6]:
raw_movies = spark.read.format("csv") \
                    .option("sep", ",") \
                    .option("inferSchema", "true") \
                    .option("header", "true") \
                    .load(f'../data/{datos_movies}')

'''
spark.read Es necesario para decir que vamos a leer de alguna fuente.

format("csv") Da cuenta del formato que estamos utilizando (en este caso un archivo csv).

option("sep", ",") Establece que el separador de campos en el archivo de datos es punto y coma en lugar de coma (el que viene por defecto).

option("inferSchema", "true") Establece que Spark trate de inferir que tipo de datos tiene cada columna 
(e.g. entero, cadena de caracteres, etc). Esto muchas veces sirve si los datos están limpios y sabemos bien
que hay en cada columna. Si el algoritmo que infiere el tipo de datos no puede decidirse, definirá a la columna
por defecto como tipo string (o cadena de caracteres) y dependerá de nosotros revisarlo.

option("header", "true") Esta opción establece que el archivo de datos tiene un encabezado (i.e. el nombre de
cada una de las columnas) en la primera línea.

load(f'../data/{datos_movies}') Esta es la parte que verdaderamente carga el conjunto de datos que será asignado
a la variable raw_movies.

Este código está escrito en Scala
'''

#raw_movies = sc.textFile(f'../recommendation_engine/data/{datos_movies}')

'\nspark.read Es necesario para decir que vamos a leer de alguna fuente.\n\nformat("csv") Da cuenta del formato que estamos utilizando (en este caso un archivo csv).\n\noption("sep", ",") Establece que el separador de campos en el archivo de datos es punto y coma en lugar de coma (el que viene por defecto).\n\noption("inferSchema", "true") Establece que Spark trate de inferir que tipo de datos tiene cada columna \n(e.g. entero, cadena de caracteres, etc). Esto muchas veces sirve si los datos están limpios y sabemos bien\nque hay en cada columna. Si el algoritmo que infiere el tipo de datos no puede decidirse, definirá a la columna\npor defecto como tipo string (o cadena de caracteres) y dependerá de nosotros revisarlo.\n\noption("header", "true") Esta opción establece que el archivo de datos tiene un encabezado (i.e. el nombre de\ncada una de las columnas) en la primera línea.\n\nload(f\'../data/{datos_movies}\') Esta es la parte que verdaderamente carga el conjunto de datos que será a

Now we have our data file loaded into the raw_movies RDD.

Without getting into *Spark transformations and actions*, the most basic thing we can do to check that we got our RDD contents right is to check the first few entries in our data. We can also count() the number of lines loaded from the file into the RDD.

In [7]:
raw_movies.take(5) #the first 5 observations with take function

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

In [8]:
raw_movies.count() #count the number of observation in the rdd

58098

To see the result in more interactive manner (rows under the columns), we can use the show operation

In [9]:
raw_movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [10]:
raw_movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



As we can see, it shows us each column (by name, according to the file header) and its type.

In [11]:
print(type(raw_movies))

<class 'pyspark.sql.dataframe.DataFrame'>


*Describe* operation is use to calculate the summary statistics of numerical column(s) in DataFrame. If we don’t specify the name of columns it will calculate summary statistics for all numerical columns present in DataFrame.

In [12]:
raw_movies.describe().show()

+-------+------------------+--------------------+------------------+
|summary|           movieId|               title|            genres|
+-------+------------------+--------------------+------------------+
|  count|             58098|               58098|             58098|
|   mean|111919.51619677097|                null|              null|
| stddev|59862.660955928804|                null|              null|
|    min|                 1|"""Great Performa...|(no genres listed)|
|    max|            193886|     줄탁동시 (2012)|           Western|
+-------+------------------+--------------------+------------------+



Let’s check what happens when we specify the name of a categorical / String columns in describe operation.

In [None]:
raw_movies.describe('movieID').show()

To split the observations we will use the map function with a lambda and inside the split function separated by comma

In [9]:
dt_movies = raw_movies.map(lambda x: x.split(",")) 

In [10]:
dt_movies.count() #para ver que no perdimos nada. Validamos que hicimos bien el parseado

58099

In [11]:
dt_movies.take(2)

[['movieId', 'title', 'genres'],
 ['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy']]

In [12]:
dt_movies.map(lambda x: x[1]).distinct().count()

57328

### Ratings dataset

In [4]:
raw_ratings = spark.read.format("csv") \
                    .option("sep", ",") \
                    .option("inferSchema", "true") \
                    .option("header", "true") \
                    .load(f'../data/{datos_ratings}')

#raw_ratings = sc.textFile(f'../data/{datos_ratings}')

In [19]:
print(type(raw_ratings))

<class 'pyspark.sql.dataframe.DataFrame'>


In [5]:
raw_ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [6]:
raw_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [13]:
raw_ratings.count()

27753445

In [17]:
raw_ratings.take(5) 
#tiene una fecha en formato timestamp entonces hay que hacer el parseado de la misma manera
#q le llamamos antes

[Row(userId=1, movieId=307, rating=3.5, timestamp=1256677221),
 Row(userId=1, movieId=481, rating=3.5, timestamp=1256677456),
 Row(userId=1, movieId=1091, rating=1.5, timestamp=1256677471),
 Row(userId=1, movieId=1257, rating=4.5, timestamp=1256677460),
 Row(userId=1, movieId=1449, rating=4.5, timestamp=1256677264)]

In [18]:
# Nota: Carga de las puntuaciones
# Función para parsear la fecha

from datetime import datetime
dateparse = lambda x: datetime.fromtimestamp(float(x))

In [19]:
dt_ratings = raw_ratings.map(lambda x: x.split("::"))
dt_ratings = dt_ratings.map(lambda x: (x[0], x[1], int(x[2]), dateparse(x[3])))
dt_ratings.take(2)

AttributeError: 'DataFrame' object has no attribute 'map'

In [None]:
dt_ratings.map(lambda x: int(x[2])).stats()

In [None]:
numero_pelicula = dt_ratings.map(lambda x: x[1]).distinct().count()
print(numero_pelicula) #aqui estamos seguros de que hay peliculas sin puntuar

In [None]:
numero_usuario = dt_ratings.map(lambda x: x[0]).distinct().count()
print(numero_pelicula) #aqui vemos el numero de usuarios distintos