## 0. Preparación de los ficheros

#### Cargar paquetes y librerías necesarias

In [1]:
import sys
import pyspark
import findspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
import pandas as pd
pd.set_option('display.max_rows', None)

In [2]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.stat import Summarizer
import numpy as np

In [3]:
conf = SparkConf().setMaster("local").setAppName("AML")
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/17 18:36:50 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/17 18:36:50 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/17 18:36:51 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/17 18:36:51 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


#### Importar los CSV y formatearlos

In [6]:
CSVanime = spark.read.option("quote", "\"").option("escape", "\"").csv('gs://anime_uem/anime.csv', inferSchema=True, header=True)

                                                                                

In [7]:
CSVrating = spark.read.option("quote", "\"").option("escape", "\"").csv('gs://anime_uem/rating_complete.csv', inferSchema=True, header=True)

                                                                                

In [8]:
CSVep = spark.read.option("quote", "\"").option("escape", "\"").csv('gs://anime_uem/valoraciones_EP.csv', inferSchema=True, header=True).toDF("user_id", "anime_id", "rating")

#### Comprobar que maneja bien las comas y las comillas

In [None]:
print("Dos primeros nombres que contienen [,]:")
CSVanime.select("Name", "ID").filter(col("Name").like("%,%")).show(2, truncate=False)
print("Dos primeros nombres que contienen [']:")
CSVanime.select("Name", "ID").filter(col("Name").like("%'%")).show(2, truncate=False)
print("Dos primeros nombres que contienen [\"]:")
CSVanime.select("Name", "ID").filter(col("Name").like('%"%')).show(2, truncate=False)
print("Dos primeros nombres con caracteres japoneses:")
CSVanime.select("Japanese name", "ID").show(2, truncate=False)

#### Analizar filas de los CSV

In [None]:
print(CSVanime.columns)

In [None]:
CSVrating.show(3)

#### Añadir cabeceras al CSV valoraciones

In [None]:
CSVep = CSVep.toDF(CSVrating.columns[0], CSVrating.columns[1], CSVrating.columns[2])
CSVep.show(3)
print("El dataframe tiene", CSVep.count(), "filas")

## 1. Hacer un análisis exploratorio de los datos con pySpark, mostrando información relevante de los mismos (cuáles son los ítems mejor y peor valorados, la relación entre género y valoraciones, estudios con mejor y peor nota media, etc...).

In [None]:
minValoraciones = 50
cantidadTop = 3
mejorMedia = CSVrating.groupBy("anime_id").agg(F.mean("rating"), F.count("rating"))
mejorMedia = mejorMedia.withColumnRenamed("avg(rating)", "average")
mejorMedia = mejorMedia.withColumnRenamed("count(rating)", "count")
juntar = CSVanime.select("ID", "Name")
mejorMedia = mejorMedia.join(juntar, CSVrating.anime_id==juntar.ID)
mejorMedia = mejorMedia.where(col("count") > minValoraciones)
print("Top", cantidadTop, "items con mayor nota media (con al menos", minValoraciones, "valoraciones):")
mejorMedia.sort("average", ascending=False).withColumnRenamed("count", "Nº ratings").drop("ID", "anime_id").show(cantidadTop, truncate=False)
print("Top", cantidadTop, "items con peor nota media (con al menos", minValoraciones, "valoraciones):")
mejorMedia.sort("average", ascending=True).withColumnRenamed("count", "Nº ratings").drop("ID", "anime_id").show(cantidadTop, truncate=False)

##### La categoría con menos items tiene 1690, por lo que no será necesario eliminar ninguna a la hora de futuros estudios

In [None]:
print("Géneros con mejor valoración media:")
CSVgeneros = CSVanime.select("Genres", "ID").filter(CSVanime.Genres != "Unknown")
CSVgeneros = CSVgeneros.join(CSVrating, CSVgeneros.ID==CSVrating.anime_id).drop("ID", "anime_id", "user_id")
CSVgeneros = CSVgeneros.select(split(col("Genres"), ","), "rating").withColumnRenamed("split(Genres, ,, -1)", "Genres")
CSVgeneros = CSVgeneros.select(explode(CSVgeneros.Genres), CSVgeneros.rating).withColumnRenamed("col", "Genero")
CSVgeneros = CSVgeneros.withColumn("Genero", trim(CSVgeneros.Genero))
CSVgeneros.groupBy("Genero").agg(F.mean("rating")).sort("avg(rating)", ascending=False).show()
print("Géneros con peor valoración media:")
CSVgeneros.groupBy("Genero").agg(F.mean("rating")).sort("avg(rating)", ascending=True).show()

In [None]:
CSVestudios = CSVanime.select("Studios", "Score")
CSVestudios = CSVestudios.filter((CSVestudios.Score != "Unknown") & (CSVestudios.Studios != "Unknown"))
CSVestudios = CSVestudios.groupBy("Studios").agg(F.mean("Score"), F.count("Score"))
CSVestudios = CSVestudios.withColumnRenamed("avg(Score)", "Valoracion_media").withColumnRenamed("count(Score)", "Cantidad")
CSVestudios = CSVestudios.filter(CSVestudios.Cantidad >= 30).drop("Cantidad")
print("Estudios con peor valoración media (con al menos 30 valoraciones):")
CSVestudios.orderBy('Valoracion_media', ascending=True).show()
print("Estudios con mejor valoración media (con al menos 30 valoraciones):")
CSVestudios.orderBy('Valoracion_media', ascending=False).show(truncate=False)

## 2. Crear un programa con Spark ML y el algoritmo de recomendación ALS que genere un listado con 5 series de TV y 5 películas para recomendar al usuario EP (con id 666666) para ello se deben incorporar las valoraciones de EP al fichero de valoraciones total, entrenar el algoritmo y pedirle un listado de recomendaciones para ese usuario. Las recomendaciones generadas deben incluir el ID del anime y los títulos original (name) y en inglés (English name). El listado debe aparecer ordenado por valoración media de cada serie/película

In [9]:
def valMedia(df):
    df = df.withColumn("Val_media", col("Score-10")*10+col("Score-9")*9+col("Score-8")*8+col("Score-7")*7+col("Score-6")*6+col("Score-5")*5+col("Score-4")*4+col("Score-3")*3+col("Score-2")*2+col("Score-1"))
    df = df.withColumn("Val_media", col("Val_media")/(col("Score-1")+col("Score-2")+col("Score-3")+col("Score-4")+col("Score-5")+col("Score-6")+col("Score-7")+col("Score-8")+col("Score-9")+col("Score-10")))
    df = df.drop("Score-1", "Score-2", "Score-3", "Score-4", "Score-5", "Score-6", "Score-7", "Score-8", "Score-9", "Score-10")
    return df

In [10]:
def durationNumber(df):
    df = df.withColumn("Duration", regexp_replace("Duration", " per ep.", ""))
    df = df.withColumn("Duration", regexp_replace("Duration", "\.", ""))
    df = df.withColumn("Duration", regexp_replace("Duration", " hr", "h"))
    df = df.withColumn("Duration", regexp_replace("Duration", " min", "m"))
    df = df.withColumn("Duration", reverse(split(col("Duration"), " ")))
    df = df.withColumn("mins", regexp_replace(col("Duration")[0], "m", ""))
    df = df.withColumn("hours", regexp_replace(col("Duration")[1], "h", ""))
    df = df.fillna({"hours": 0})
    df = df.withColumn("DurationN", col("hours")*60+col("mins"))
    df = df.drop("Duration", "mins", "hours")
    return df

#### Limpiar y preparar el dataframe

In [11]:
CSVml = CSVrating.union(CSVep).withColumnRenamed("rating", "User_rating")
CSVml = CSVml.join(CSVanime, CSVml.anime_id==CSVanime.ID)
CSVml = CSVml.select("user_id", "anime_id", "User_rating", "Genres", "name", "English name", "Type", "Episodes", "Studios", "Source", "Ranked", "Duration", "Popularity", "Score-10", "Score-9", "Score-8", "Score-7", "Score-6", "Score-5", "Score-4", "Score-3", "Score-2", "Score-1")
CSVml = CSVml.withColumnRenamed("English name", "English_name")
CSVml = CSVml.withColumn("anime_id", col("anime_id").cast(IntegerType()))
CSVml = valMedia(CSVml)
CSVml = durationNumber(CSVml)
CSVml = CSVml.where(CSVml.Episodes != "Unknown").where(CSVml.English_name != "null").where(CSVml.English_name != "Unknown")
CSVml = CSVml.na.drop(subset=["DurationN", "Val_media"])
print("El dataframe tiene", CSVml.count(), "filas")
CSVml.printSchema()
CSVml.show(5)

                                                                                

El dataframe tiene 46357218 filas
root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- User_rating: double (nullable = true)
 |-- Genres: string (nullable = true)
 |-- name: string (nullable = true)
 |-- English_name: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Ranked: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Val_media: double (nullable = true)
 |-- DurationN: double (nullable = true)



                                                                                

+-------+--------+-----------+--------------------+--------------------+--------------------+-----+--------+---------------+--------+------+----------+-----------------+---------+
|user_id|anime_id|User_rating|              Genres|                name|        English_name| Type|Episodes|        Studios|  Source|Ranked|Popularity|        Val_media|DurationN|
+-------+--------+-----------+--------------------+--------------------+--------------------+-----+--------+---------------+--------+------+----------+-----------------+---------+
|      0|     430|        9.0|Military, Comedy,...|Fullmetal Alchemi...|Fullmetal Alchemi...|Movie|       1|          Bones|   Manga|1361.0|       506|7.587968995355985|    105.0|
|      0|    1004|        5.0|Drama, Psychologi...|Kanojo to Kanojo ...|She and Her Cat:T...|  OVA|       1|        Unknown|Original|2226.0|      1624|7.350737797956867|      4.0|
|      0|    3010|        7.0|Adventure, Histor...|      Kaiketsu Zorro|The Magnificent Z...|   TV| 

#### DataFrame con películas

In [None]:
EPpelis = CSVml.where(CSVml.Type == "Movie").drop("Type", "Episodes", "Japanese_name")
print("El dataframe tiene", EPpelis.count(), "filas")
EPpelis = EPpelis.withColumn("Ranked", col("Ranked").cast(IntegerType()))
indexer = StringIndexer(inputCol="Genres", outputCol="GenresN")
EPpelis = indexer.fit(EPpelis).transform(EPpelis)
indexer = StringIndexer(inputCol="Studios", outputCol="StudiosN")
EPpelis = indexer.fit(EPpelis).transform(EPpelis)
indexer = StringIndexer(inputCol="Source", outputCol="SourceN")
EPpelis = indexer.fit(EPpelis).transform(EPpelis)
EPpelis.show(5)

In [None]:
EPpelisN = EPpelis.drop("Genres", "Studios", "Source")
EPpelisN.printSchema()
EPpelisN.show(5)

#### DataFrame con series

In [13]:
EPseries = CSVml.where(((CSVml.Type == "ONA") & (CSVml.Episodes > 1)) | (CSVml.Type == "TV")).drop("Japanese_name")
print("El dataframe tiene", EPseries.count(), "filas")
EPseries = EPseries.withColumn("Episodes", col("Episodes").cast(IntegerType()))
EPseries = EPseries.withColumn("Ranked", col("Ranked").cast(IntegerType()))
indexer = StringIndexer(inputCol="Genres", outputCol="GenresN")
EPseries = indexer.fit(EPseries).transform(EPseries)
indexer = StringIndexer(inputCol="Type", outputCol="TypeN")
EPseries = indexer.fit(EPseries).transform(EPseries)
indexer = StringIndexer(inputCol="Studios", outputCol="StudiosN")
EPseries = indexer.fit(EPseries).transform(EPseries)
indexer = StringIndexer(inputCol="Source", outputCol="SourceN")
EPseries = indexer.fit(EPseries).transform(EPseries)
EPseries.show(5)

                                                                                

El dataframe tiene 35896854 filas


                                                                                

+-------+--------+-----------+--------------------+-------------------+--------------------+----+--------+---------------+------------+------+----------+------------------+---------+-------+-----+--------+-------+
|user_id|anime_id|User_rating|              Genres|               name|        English_name|Type|Episodes|        Studios|      Source|Ranked|Popularity|         Val_media|DurationN|GenresN|TypeN|StudiosN|SourceN|
+-------+--------+-----------+--------------------+-------------------+--------------------+----+--------+---------------+------------+------+----------+------------------+---------+-------+-----+--------+-------+
|      0|    3010|        7.0|Adventure, Histor...|     Kaiketsu Zorro|The Magnificent Z...|  TV|      52|Ashi Production|       Other|  2655|      5104| 7.227823867262285|     24.0| 1336.0|  0.0|   302.0|    8.0|
|      0|    1571|       10.0|Mystery, Comedy, ...|         Ghost Hunt|          Ghost Hunt|  TV|      25|      J.C.Staff| Light novel|   811|  

In [14]:
EPseriesN = EPseries.drop("Genres", "Type", "Studios", "Source")
EPseriesN.printSchema()
EPseriesN.show(5)

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- User_rating: double (nullable = true)
 |-- name: string (nullable = true)
 |-- English_name: string (nullable = true)
 |-- Episodes: integer (nullable = true)
 |-- Ranked: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Val_media: double (nullable = true)
 |-- DurationN: double (nullable = true)
 |-- GenresN: double (nullable = false)
 |-- TypeN: double (nullable = false)
 |-- StudiosN: double (nullable = false)
 |-- SourceN: double (nullable = false)



                                                                                

+-------+--------+-----------+-------------------+--------------------+--------+------+----------+------------------+---------+-------+-----+--------+-------+
|user_id|anime_id|User_rating|               name|        English_name|Episodes|Ranked|Popularity|         Val_media|DurationN|GenresN|TypeN|StudiosN|SourceN|
+-------+--------+-----------+-------------------+--------------------+--------+------+----------+------------------+---------+-------+-----+--------+-------+
|      0|    3010|        7.0|     Kaiketsu Zorro|The Magnificent Z...|      52|  2655|      5104| 7.227823867262285|     24.0| 1336.0|  0.0|   302.0|    8.0|
|      0|    1571|       10.0|         Ghost Hunt|          Ghost Hunt|      25|   811|       766|7.7579663241750785|     25.0|  479.0|  0.0|     0.0|    1.0|
|      0|     121|        9.0|Fullmetal Alchemist| Fullmetal Alchemist|      51|   337|        52|  8.15076332151554|     24.0|   73.0|  0.0|     3.0|    0.0|
|      0|     356|        9.0|    Fate/stay ni

#### Entrenamiento películas

In [None]:
training, test = EPpelisN.randomSplit([0.8, 0.2])
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="anime_id", ratingCol="User_rating", coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="User_rating", predictionCol="prediction")

userRecs = model.recommendForAllUsers(5)
userRecs.show(5, truncate=False)

In [None]:
recomPelis = userRecs.where(col("user_id") == 666666)
recomPelis = recomPelis.select("user_id", explode(recomPelis.recommendations)).withColumnRenamed("col", "seleccion")
recomPelis = recomPelis.select("user_id", "seleccion.*")
recomPelis.printSchema()
recomPelis.show(truncate=False)

#### Entrenamiento series

In [15]:
training, test = EPseriesN.randomSplit([0.8, 0.2])
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="anime_id", ratingCol="User_rating", coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="User_rating", predictionCol="prediction")

userRecs = model.recommendForAllUsers(5)
userRecs.show(5, truncate=False)



+-------+---------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                    |
+-------+---------------------------------------------------------------------------------------------------+
|31     |[{3869, 34.61431}, {3372, 25.004154}, {19987, 22.906887}, {36530, 21.224953}, {36164, 19.045256}]  |
|34     |[{28145, 17.19398}, {36722, 13.403998}, {36425, 13.117142}, {38556, 12.686409}, {38378, 12.4403}]  |
|53     |[{8182, 16.302698}, {28145, 16.115284}, {32807, 12.922548}, {27457, 10.951656}, {38378, 10.840336}]|
|65     |[{3372, 13.038048}, {4782, 11.810699}, {5636, 11.008157}, {2921, 10.590764}, {34724, 10.436701}]   |
|78     |[{36722, 15.980451}, {3372, 15.367884}, {33473, 13.069779}, {38556, 12.745124}, {36425, 12.40995}] |
+-------+---------------------------------------------------------------------------------------------------+
only showi

                                                                                

In [16]:
recomSeries = userRecs.where(col("user_id") == 666666)
recomSeries = recomSeries.select("user_id", explode(recomSeries.recommendations)).withColumnRenamed("col", "seleccion")
recomSeries = recomSeries.select("user_id", "seleccion.*")
recomSeries.printSchema()
recomSeries.show(truncate=False)

root
 |-- user_id: integer (nullable = false)
 |-- anime_id: integer (nullable = true)
 |-- rating: float (nullable = true)





+-------+--------+---------+
|user_id|anime_id|rating   |
+-------+--------+---------+
|666666 |3869    |22.101383|
|666666 |8786    |15.59967 |
|666666 |14623   |14.103424|
|666666 |19987   |12.949297|
|666666 |28145   |12.508526|
+-------+--------+---------+



                                                                                

#### Películas recomedadas para EP:

In [None]:
EPpelisElegidas = EPpelis.where((col("anime_id") == 32890) | (col("anime_id") == 33533) | (col("anime_id") == 21129) | (col("anime_id") == 37392) | (col("anime_id") == 4621))
EPpelisElegidas = EPpelisElegidas.dropDuplicates(["anime_id"])
EPpelisElegidas = EPpelisElegidas.select("anime_id", "User_rating", "name", "English_name", "Val_media")
EPpelisElegidas = EPpelisElegidas.sort("Val_media", ascending=False)
EPpelisElegidas.show(truncate=False)

#### Series recomedadas para EP:

In [22]:
EPseriesElegidas = EPseries.where((col("anime_id") == 3869) | (col("anime_id") == 8786) | (col("anime_id") == 14623) | (col("anime_id") == 19987) | (col("anime_id") == 28145))
EPseriesElegidas = EPseriesElegidas.dropDuplicates(["anime_id"])
EPseriesElegidas = EPseriesElegidas.select("anime_id", "name", "English_name", "Val_media")
EPseriesElegidas = EPseriesElegidas.sort("Val_media", ascending=False)
EPseriesElegidas.show(truncate=False)



+--------+--------------------------------+-------------------------------+----------------+
|anime_id|name                            |English_name                   |Val_media       |
+--------+--------------------------------+-------------------------------+----------------+
|3869    |Sakura Momoko Gekijou: Coji-Coji|Coji-Coji                      |6.37037037037037|
|8786    |Inakappe Taishou                |General Inakappe               |5.768           |
|28145   |Johnny Cypher                   |Johnny Cypher in Dimension Zero|5.625           |
|19987   |Kaitou Pride                    |Dr. Zen                        |5.55            |
|14623   |Chikyuu SOS Sore Ike Kororin    |Do it Kororin Earth SOS        |5.375           |
+--------+--------------------------------+-------------------------------+----------------+



                                                                                

## 3. Además se debe recuperar y mostrar información detallada (sinopsis, imagen, trailer, etc.) de cada serie o película extrayendo la información con la API Jikan (https://docs.api.jikan.moe Links to an external site.). Por ejemplo, para recuperar la información básica del anime con ID 13601, hay que hacer una llamada a la URL https://api.jikan.moe/v4/anime/13601/full Links to an external site.y luego parsear los datos del JSON que devuelve. Mostrar la información recuperada de forma gráfica en el cuaderno