#### El cliente nos ha solicitado realizar algunos métodos que resuelvan consultas específicas sobre las tablas movies_df, ratings_df y tags_df. Lee cuidadosamente cada consulta y desarrolla el método correspondiente dada la firma del método requerida.

##### Nota: Para poder trabajar con este notebook es necesario haber terminado el ejercicio de la sesión 06

In [None]:
%%HTML <style>pre { white-space: pre !important; }</style>

In [None]:
// NO MODIFICAR CONTENIDO DE ESTA CELDA
import org.apache.spark.sql.{SparkSession, DataFrame, Column, Row}
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.{types => t}

def difference(l1: Seq[String], l2: Seq[String]): Seq[Column] =
    l1.diff(l2).map(colName => f.col(colName))

def readTmpDf(dfSeq: Seq[String]): Map[String, DataFrame] =
    dfSeq.map(table_name => (table_name, spark.read.parquet("../../resources/data/tmp/parquet/" + table_name))).toMap

def writeTmpDf(dfSeq: Seq[(DataFrame, String)]): Unit = 
    dfSeq.foreach{case (df: DataFrame, name: String) => df.write.mode("overwrite").parquet("../../resources/data/tmp/parquet/" + name)}

def schema_to_ddl(df: DataFrame): String = df.schema.toDDL.replace(" NOT NULL", "")

In [None]:
// NO MODIFICAR CONTENIDO DE ESTA CELDA

// Creación de sesión de Spark
val spark = SparkSession.builder
    .master("local[*]")
    .appName("ejercicio_7")
    .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "GMT-6")

// Carga de tablas requeridas
val RootPath = "../../resources/data/tmp/parquet/"
val namesList = Seq("06/movies", "06/ratings", "06/tags")
val dfMap = readTmpDf(namesList)

val moviesDf = dfMap("06/movies")
val ratingsDf = dfMap("06/ratings")
val tagsDf = dfMap("06/tags")

moviesDf.show(1, false)
ratingsDf.show(1)
tagsDf.show(1)

#### Actividad 1:
##### TO DO ->    Para el dataframe "moviesDf":
- ##### Genera un método llamado getAllGenres que retorne un DataFrame con únicamente una columna conteniendo todos los valores distintos (sin repetir) de la columna "genres"
    - ##### Firma: def getAllGenres(df: DataFrame): DataFrame
    - Apoyate de la funcion explode -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html#pyspark.sql.functions.explode

- ##### Genera un método llamado get_min_year que retorne un valor de tipo int que contenga el menor año registrado (omitiendo nulls)
    - ##### Firma: def getMinYear(df: DataFrame): Int
    - ##### Necesitarás llamar alguna de las siguientes acciones: take, first, head.

- ##### Genera un método llamado get_min_year que retorne un valor de tipo int que contenga el mayor año registrado (omitiendo nulls)
    - ##### Firma: def getMaxYear(df: DataFrame): Int
    - ##### Necesitarás llamar alguna de las siguientes acciones: take, first, head.

##### NO UTILIZAR withColumn NI withColumnRenamed

In [None]:
// TU CODIGO VA EN ESTA CELDA:

def getAllGenres(df: DataFrame): DataFrame =
    df // modificar codigo interno
    
def getMinYear(df: DataFrame): Int =
    return df.select("year").first.getAs[Int]("year") // modificar codigo interno
    
def getMaxYear(df: DataFrame): Int =
    return df.select("year").first.getAs[Int]("year") // modificar codigo interno

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

getAllGenres(moviesDf).show(20, false)
"""
Ejemplo de salida esperada (el nombre de la columna podría ser distinto):
+-----------+
|col        |
+-----------+
|Crime      |
|Romance    |
|Thriller   |
|Adventure  |
|Drama      |
|War        |
|Documentary|
|Fantasy    |
|Mystery    |
|Musical    |
|Animation  |
|Film-Noir  |
|IMAX       |
|Horror     |
|Western    |
|Comedy     |
|Children   |
|Action     |
|Sci-Fi     |
+-----------+
"""
println(getMinYear(moviesDf))
// Salida esperada: 1874
println(getMaxYear(moviesDf))
// Salida esperada: 2023

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

val genresDf = getAllGenres(moviesDf)

assert(genresDf.isInstanceOf[DataFrame])
assert(genresDf.columns.size == 1)

val expectedOutput = Seq("Action", "Adventure", "Animation", "Children", "Comedy", 
                         "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", 
                         "Horror", "IMAX", "Musical", "Mystery", "Romance", "Sci-Fi", 
                         "Thriller", "War", "Western")
val genresList = genresDf.rdd.map(item => item(0).toString).collect().sortWith(_.compareTo(_) < 0)
assert(genresList.diff(expectedOutput).size + expectedOutput.diff(genresList).size == 0)

val minYear = getMinYear(moviesDf)
assert(minYear == 1874)

val maxYear = getMaxYear(moviesDf)
assert(maxYear == 2023)

#### Actividad 2:
##### TO DO -> Para la tabla "ratingsDf" el cliente requiere hacer un análisis de cada "movie_id" para tener una idea de qué tan buena es cada pelicula, asi que nos solicitó desarrollar un método que realice múltiples cálculos.
##### Generar un método que retorne un DataFrame con las columnas especificadas por cada "movie_id":
- ##### Firma: def calculateRatingValues(df: DataFrame): DataFrame
- ##### Columnas generadas:
    - ##### nombre: avg_rating, tipo: DoubleType() -> Valor rating promedio (Redondear a 2 decimales)
    - ##### nombre: stddev_rating, tipo: DoubleType() -> Desviacion estándar (stddev_pop) para la columna rating (Redondear a 2 decimales)
        - Para redondear valores utiliza la función round de Spark -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.round.html#pyspark.sql.functions.round
    - ##### nombre: count_rating, tipo: LongType() -> Total de calificaciones recibidas
- ##### ACTUALIZACIÓN: Para aquellas peliculas (movie_id) que no se tenga identificado su año en la tabla "movies_df" nos han solicitado calcular el año de la siguiente manera (más adelante nos preocuparemos por pegar el año a la tabla movies_df):
    - ##### nombre: min_time_rating, tipo: TimestampType() -> Fecha más antigua de la columna "time" en la que se asignó el primer rating
##### Nota 1: podemos hacer el calculo de "min_time_rating" en la misma transformación en las que se generan las columnas "avg_rating", "stddev_rating", y "count_rating"
##### Nota 2: Posiblemente requieras analizar las funciones de agregación existentes en Spark -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#aggregate-functions
* El dataframe de salida deberá contar con la siguiente estructura al hacer printSchema():
* |-- movie_id: string
* |-- avg_rating: double
* |-- stddev_rating: double
* |-- count_rating: long
* |-- min_time_rating: timestamp
##### NO UTILIZAR withColumn NI withColumnRenamed

In [None]:
// TU CODIGO VA EN ESTA CELDA:

def calculateRatingValues(df: DataFrame): DataFrame = 
    df //    modificar codigo interno

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
calculateRatingValues(ratingsDf).show(2)
"""
Ejemplo de salida esperada:
+--------+----------+-------------+------------+-------------------+
|movie_id|avg_rating|stddev_rating|count_rating|    min_time_rating|
+--------+----------+-------------+------------+-------------------+
|     296|      4.19|         0.95|      108756|1996-02-29 10:48:44|
|  115713|      3.99|         0.83|       21335|2015-01-02 06:05:51|
+--------+----------+-------------+------------+-------------------+
only showing top 2 rows
"""

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
import java.sql.Timestamp

val ratingValuesDf = calculateRatingValues(ratingsDf)

assert(ratingValuesDf.isInstanceOf[DataFrame])
assert(ratingValuesDf.columns.size == 5)
assert(ratingValuesDf.count() == 83239)

val data = Seq(Row("296", 4.19, 0.95, 108756L, Timestamp.valueOf("1996-02-29 10:48:44.0")))

val schema = t.StructType(Seq(
    t.StructField("movie_id", t.StringType),
    t.StructField("avg_rating", t.DoubleType),
    t.StructField("stddev_rating", t.DoubleType),
    t.StructField("count_rating", t.LongType),
    t.StructField("min_time_rating", t.TimestampType)))

val testDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
assert(ratingValuesDf.filter(f.col("movie_id") === "296").except(testDf).count() == 0)

#### Actividad 3:
##### TO DO -> El cliente ha solicitado generar dos tablas con el mismo contenido pero con distinto esquema a partir de la tabla "tagsDf", lee a continuación la información enviada con los requerimientos:
- ##### En ambas tablas se requiere agrupar por "movie_id" y calcular:
    - ##### El total de veces en las que aparece cada tag (en mayusculas) .
    - ##### Fecha más antigua en la que se asignó el primer tag.
La primer tabla deberá tener la siguiente estructura:  
*      | movie_id |             tag_count |        min_time_tag |
*      |        1 | [SCI-FI:2, TERROR:12] | 2017-06-02 07:20:27 |
*      |        3 |  [DRAMA:14, SCI-FI:4] | 2012-06-02 07:20:27 |
con esquema:
*      |-- movie_id: string
*      |-- tag_count: array
*      |    |-- element: string
*      |-- min_time_tag: timestamp
La segunda tabla deberá tener la siguiente estructura:
*      | movie_id |                   tag_count |        min_time_tag |
*      |        1 | [{SCI-FI, 2}, {TERROR, 12}] | 2017-06-02 07:20:27 |
*      |        3 |  [{DRAMA, 14}, {SCI-FI, 4}] | 2012-06-02 07:20:27 |
con esquema:
*      |-- movie_id: string
*      |-- tag_count: array
*      |    |-- element: struct
*      |    |    |-- tag: string
*      |    |    |-- count: long
*      |-- min_time_tag: timestamp
- ##### NOTA: Ordena la columna tag_count, la cual es un array, de forma ascendente.
- #### La generación de la primer tabla es a través del método con la firma:
    - ##### def getAct3Df1(df: DataFrame): DataFrame
- #### La generación de la segunda tabla es a través del método con la firma:
    - ##### def getAct3Df2(df: DataFrame): DataFrame

#### La siguiente información muestra los pasos recomendados para resolver la actividad 3, puedes omitir leer esta parte si asi lo consideras.
##### Para generar ambas tablas primero necesitamos obtener el total veces (count) en las que un "tag" se repite por cada "movie_id", para evitar conteos erroneos hay que convertir cada tag en Mayusculas.
##### El dataframe resultante deberá contener la siguiente estructura: "movie_id", "tag", "count" y "min" donde la columna "count" representa el total de veces que aparecen cada "tag"y "movie_id"; la columna "min" representa el valor mínimo de cada "tag" y "movie_id".
Por ejemplo, dado el dataframe de entrada
*      |user_id|movie_id|   tag|               time|
*      |    183|     100|sci-fi|2012-06-02 07:20:27|
*      |     12|     832| Drama|2017-06-01 07:20:27|
*      |    251|     100|SCI-FI|2009-06-04 07:20:27|
*      |    265|     832| DRAMA|2015-06-08 07:20:27|
*      |     22|     100|terror|2020-06-06 07:20:27|

debemos obtener el siguiente dataframe (el nombre de columnas "count" y "min" podria ser distinto):

*      | movie_id |    tag | count |                 min |
*      |      100 | TERROR |     1 | 2020-06-06 07:20:27 |
*      |      100 | SCI-FI |     2 | 2009-06-04 07:20:27 |
*      |      832 |  DRAMA |     2 | 2015-06-08 07:20:27 |

##### Para generar la primer tabla necesitamos concatenar las columnas "tag" y "count", posteriormente agrupando por "movie_id" generaremos la lista de elementos requerida. La columna "min_time_tag" representa el valor mínimo de cada "movie_id".
- ##### Funciones de Spark recomendadas:
    - upper -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.upper.html#pyspark.sql.functions.upper
    - concat_ws -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat_ws.html#pyspark.sql.functions.concat_ws
    - concat -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat
    - collect_list -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_list.html#pyspark.sql.functions.collect_list
    - collect_set -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html#pyspark.sql.functions.collect_set##### 
    - sort_array -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sort_array.html#pyspark-sql-functions-sort-array
##### La primer tabla quedará estructurada de la siguiente manera, donde la columna "tag_count" es de tipo ArrayType(StringType()) y la columna "min_time_tag" es de tipo TimestampType():
Dado el dataframe de entrada
*      | movie_id |    tag | count |                 min |
*      |        1 | TERROR |    12 | 2020-06-06 07:20:27 |
*      |        1 | SCI-FI |     2 | 2015-06-06 07:20:27 |
*      |        3 |  DRAMA |    14 | 2004-06-06 07:20:27 |
*      |        3 | SCI-FI |     4 | 2012-06-06 07:20:27 |
debemos obtener el siguiente dataframe
*       | movie_id |             tag_count |        min_time_tag |
*       |        1 | [SCI-FI:2, TERROR:12] | 2015-06-06 07:20:27 |
*       |        3 |  [DRAMA:14, SCI-FI:4] | 2004-06-06 07:20:27 |
##### Para generar la segunda tabla necesitamos agregar en una estructura las columnas "tag" y "count", posteriormente agrupando por movie_id generaremos la lista de elementos requerida. La columna "min_time_tag" representa el valor mínimo de cada "movie_id".
- ##### Funciones de Spark recomendadas:
    - upper -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.upper.html#pyspark.sql.functions.upper
    - struct -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.struct.html#pyspark.sql.functions.struct
    - collect_list -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_list.html#pyspark.sql.functions.collect_list
    - collect_set -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html#pyspark.sql.functions.collect_set
    - sort_array -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sort_array.html#pyspark-sql-functions-sort-array
##### La segunda tabla quedará estructurada de la siguiente manera, donde la columna "tag_count" de tipo ArrayType(StructType(StringType(),LongType()))  y la columna "min_time_tag" es de tipo TimestampType():
Dado el dataframe de entrada
*      | movie_id |    tag | count |                 min |
*      |        1 | TERROR |    12 | 2020-06-06 07:20:27 |
*      |        1 | SCI-FI |     2 | 2015-06-06 07:20:27 |
*      |        3 |  DRAMA |    14 | 2004-06-06 07:20:27 |
*      |        3 | SCI-FI |     4 | 2012-06-06 07:20:27 |
debemos obtener el siguiente dataframe
*      | movie_id |                   tag_count |        min_time_tag |
*      |        1 | [{SCI-FI, 2}, {TERROR, 12}] | 2015-06-06 07:20:27 |
*      |        3 |  [{DRAMA, 14}, {SCI-FI, 4}] | 2004-06-06 07:20:27 |

In [None]:
// TU CODIGO VA EN ESTA CELDA, PUEDES GENERAR MÉTODOS O VARIABLES NUEVAS SI ASI LO REQUIERES

def getAct3Df1(df: DataFrame): DataFrame =
    df // ... transformaciones a tagsDf

def getAct3Df2(df: DataFrame): DataFrame =
    df // ... transformaciones a tagsDf


val act3Df1: DataFrame = getAct3Df1(tagsDf)
val act3Df2: DataFrame = getAct3Df2(tagsDf)

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
act3Df1.show(2, false)
act3Df2.show(2, false)
"""
Ejemplo de salida esperada:
+--------+------------------------------------------------------------------+-------------------+
|movie_id|tag_count                                                         |min_time_tag       |
+--------+------------------------------------------------------------------+-------------------+
|100062  |[FATE : 1, PRESS-GANGED : 1, WAR : 1, WORLD WAR II : 1]           |2018-05-26 16:40:54|
|100070  |[COMEDIAN : 2, COMEDY : 1, GOOD HUMOUR : 1, STRUGGLING CAREER : 1]|2017-05-19 17:17:36|
+--------+------------------------------------------------------------------+-------------------+
only showing top 2 rows

+--------+----------------------------------------------------------------------+-------------------+
|movie_id|tag_count                                                             |min_time_tag       |
+--------+----------------------------------------------------------------------+-------------------+
|100062  |[{FATE, 1}, {PRESS-GANGED, 1}, {WAR, 1}, {WORLD WAR II, 1}]           |2018-05-26 16:40:54|
|100070  |[{COMEDIAN, 2}, {COMEDY, 1}, {GOOD HUMOUR, 1}, {STRUGGLING CAREER, 1}]|2017-05-19 17:17:36|
+--------+----------------------------------------------------------------------+-------------------+
only showing top 2 rows
"""

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
import java.sql.Timestamp

val expectedValueDf1 = Seq(Row("100070",
                               List("COMEDIAN : 2",
                                    "COMEDY : 1",
                                    "GOOD HUMOUR : 1",
                                    "STRUGGLING CAREER : 1"),
                               Timestamp.valueOf("2017-05-19 17:17:36.0")))
val schemaDf1 = t.StructType(Seq(
    t.StructField("movie_id", t.StringType),
    t.StructField("tag_count", t.ArrayType(t.StringType)),
    t.StructField("min_time_tag", t.TimestampType)
))
assert(act3Df1.columns.size == 3)
assert(act3Df1.columns.toSeq.contains("movie_id"))
assert(act3Df1.columns.toSeq.contains("tag_count"))
assert(act3Df1.columns.toSeq.contains("min_time_tag"))
assert(schema_to_ddl(act3Df1.select("movie_id", "tag_count", "min_time_tag")) == "movie_id STRING,tag_count ARRAY<STRING>,min_time_tag TIMESTAMP")
assert(act3Df1.count() == 53452)
val testDf1 = spark.createDataFrame(spark.sparkContext.parallelize(expectedValueDf1), schemaDf1)
assert(act3Df1.select("movie_id", "tag_count", "min_time_tag").filter(f.col("movie_id") === "100070").except(testDf1).count() == 0)

val expectedValueDf2 = Seq(Row("100070",
                               List(Row("COMEDIAN",2L),
                                    Row("COMEDY",1L),
                                    Row("GOOD HUMOUR",1L),
                                    Row("STRUGGLING CAREER",1L)),
                               Timestamp.valueOf("2017-05-19 17:17:36.0")))
val schemaDf2 = t.StructType(Seq(
    t.StructField("movie_id", t.StringType),
    t.StructField("tag_count", t.ArrayType(t.StructType(Seq(
        t.StructField("tag", t.StringType),
        t.StructField("count", t.LongType)
    )))),
    t.StructField("min_time_tag", t.TimestampType)
))
assert(act3Df2.columns.size == 3)
assert(act3Df2.columns.toSeq.contains("movie_id"))
assert(act3Df2.columns.toSeq.contains("tag_count"))
assert(act3Df2.columns.toSeq.contains("min_time_tag"))
assert(schema_to_ddl(act3Df2.select("movie_id", "tag_count", "min_time_tag")) == "movie_id STRING,tag_count ARRAY<STRUCT<tag: STRING, count: BIGINT>>,min_time_tag TIMESTAMP")
assert(act3Df2.count() == 53452)
val testDf2 = spark.createDataFrame(spark.sparkContext.parallelize(expectedValueDf2), schemaDf2)
assert(act3Df2.select("movie_id", "tag_count", "min_time_tag").filter(f.col("movie_id") === "100070").except(testDf2).count() == 0)

#### Actividad 4:
##### TO DO -> El cliente ha solicitado que resolvamos dos consultas con la salida de la actividad 3, como no especificó con cuál dataframe requiere la consulta lo realizaremos con ambos dataframes (act3Df1 y act3Df2).
##### Las consultas a resolver son:
- ##### 1.- ¿Cuál es la pelicula (movie_id) con más tags con el valor "SCI-FI"?
    - ##### Estrictamente especificó no tomar en cuenta datos como "REALISTIC SCI-FI", "HARD SCI-FI", etc.
- ##### 2.- ¿Cuántas peliculas fueron etiquetadas como "SCI-FI"?
##### La forma de resolver estas consultas será a través de un método el cual va a retornar una tupla (str, int), donde el primer elemento (str) representa el resultado de la consulta 1 y el segundo elemento (int) representa el resultado de la consulta 2
- ##### La firma del método que utilizará cada dataFrame es:
    - ##### def exercise4Df1(df: DataFrame): (String, Long) -> firma para el dataFrame act3Df1
    - ##### def exercise4Df2(df: DataFrame): (String, Long) -> firma para el dataFrame act3Df2
##### Funciones de Spark recomendadas:
- explode -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html#pyspark-sql-functions-explode
- regexp_extract -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_extract.html#pyspark.sql.functions.regexp_extract
##### Funciones de la clase Column recomendadas:
- like -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.like.html#pyspark.sql.Column.like
- ilike -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.ilike.html#pyspark.sql.Column.ilike
- getField -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getField.html#pyspark.sql.Column.getField
##### Acciones que podrias utilizar:
- count, first, head, take

In [None]:
// TU CODIGO VA EN ESTA CELDA, PUEDES GENERAR MÉTODOS O VARIABLES SI ASI LO REQUIERES

def exercise4Df1(df: DataFrame): (String, Long) =
    (df.first.getAs[String]("movie_id"), df.count()) // modificar codigo interno

def exercise4Df2(df: DataFrame): (String, Long) =
    (df.first.getAs[String]("movie_id"), df.count()) // modificar codigo interno

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
println(exercise4Df1(act3Df1))
println(exercise4Df2(act3Df2))
// Salida esperada en ambos casos
//('260', 854)

In [None]:
val resultExercise4Df1 = exercise4Df1(act3Df1)
assert(resultExercise4Df1._1 == "260")
assert(resultExercise4Df1._2 == 854)

val resultExercise4Df2 = exercise4Df2(act3Df2)
assert(resultExercise4Df2._1 == "260")
assert(resultExercise4Df2._2 == 854)

In [None]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
val dfs = Seq((moviesDf, "07/movies"),
              (act3Df1, "07/tags_p1"),
              (act3Df2, "07/tags_p2"),
              (calculateRatingValues(ratingsDf), "07/ratings"))

writeTmpDf(dfs)