#### Hasta este punto tenemos nuestras tablas procesadas (movies, ratings y tags) de forma correcta. El cliente nos ha solicitado apoyar al departamento de Marketing a realizar algunas consultas y a generar una única tabla y realizar algunos ajustes a la tabla final antes de poder almacenarla.

##### Nota: Para poder trabajar con este notebook es necesario haber terminado el ejercicio de la sesión 07

In [1]:
%%HTML <style>pre { white-space: pre !important; }</style>

In [2]:
// NO MODIFICAR CONTENIDO DE ESTA CELDA
import org.apache.spark.sql.{SparkSession, DataFrame, Column, Row}
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.{types => t}

def difference(l1: Seq[String], l2: Seq[String]): Seq[Column] =
    l1.diff(l2).map(colName => f.col(colName))

def readTmpDf(dfSeq: Seq[String]): Map[String, DataFrame] =
    dfSeq.map(table_name => (table_name, spark.read.parquet("../../resources/data/tmp/parquet/" + table_name))).toMap

def writeTmpDf(dfSeq: Seq[(DataFrame, String)]): Unit = 
    dfSeq.foreach{case (df: DataFrame, name: String) => df.write.mode("overwrite").parquet("../../resources/data/tmp/parquet/" + name)}

def schema_to_ddl(df: DataFrame): String = df.schema.toDDL.replace(" NOT NULL", "")

import org.apache.spark.sql.{functions=>f}
import org.apache.spark.sql.{types=>t}
difference: (l1: Seq[String], l2: Seq[String])Seq[org.apache.spark.sql.Column]
readTmpDf: (dfSeq: Seq[String])Map[String,org.apache.spark.sql.DataFrame]
writeTmpDf: (dfSeq: Seq[(org.apache.spark.sql.DataFrame, String)])Unit
schema_to_ddl: (df: org.apache.spark.sql.DataFrame)String


In [3]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

// Creación de sesión de Spark
val spark = SparkSession.builder
    .master("local[*]")
    .appName("ejercicio_8")
    .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "GMT-6")

// Carga de tablas requeridas
val RootPath = "../../resources/data/tmp/parquet/"
val namesList = Seq("07/movies", "07/ratings", "07/tags_p2")
val dfMap = readTmpDf(namesList)

val moviesDf = dfMap("07/movies")
val ratingsDf = dfMap("07/ratings")
val tagsDf = dfMap("07/tags_p2")

moviesDf.show(1, false)
ratingsDf.show(1)
tagsDf.show(1)

+--------+----------------+-------------------------------------------------+----+
|movie_id|title           |genres                                           |year|
+--------+----------------+-------------------------------------------------+----+
|1       |Toy Story (1995)|[Adventure, Animation, Children, Comedy, Fantasy]|1995|
+--------+----------------+-------------------------------------------------+----+
only showing top 1 row

+--------+----------+-------------+------------+-------------------+
|movie_id|avg_rating|stddev_rating|count_rating|    min_time_rating|
+--------+----------+-------------+------------+-------------------+
|    1049|       3.4|         0.97|        5816|1996-10-10 02:58:19|
+--------+----------+-------------+------------+-------------------+
only showing top 1 row

+--------+--------------------+-------------------+
|movie_id|           tag_count|       min_time_tag|
+--------+--------------------+-------------------+
|  100001|[{PERFORMANCE (MO...|2018-

tagsDf = [movie_id: string, tag_count: array<struct<tag:string,...


spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@194557fd
RootPath: String = ../../resources/data/tmp/parquet/
namesList: Seq[String] = List(07/movies, 07/ratings, 07/tags_p2)
dfMap: Map[String,org.apache.spark.sql.DataFrame] = Map(07/movies -> [movie_id: string, title: string ... 2 more fields], 07/ratings -> [movie_id: string, avg_rating: double ... 3 more fields], 07/tags_p2 -> [movie_id: string, tag_count: array<struct<tag:string,count:bigint>> ... 1 more field])
moviesDf: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 2 more fields]
ratingsDf: org.apache.spark.sql.DataFrame = [movie_id: string, avg_rating: double ... 3 more fields]


[movie_id: string, tag_count: array<struct<tag:string,...

#### En la siguiente imagen se muestra una representación a traves del diagrama de Venn sobre cada tabla (moviesDf, ratingsDf y tagsDf) la cual fue contruida por el departamento de Marketing, el problema es que no saben la cantidad de datos de cada conjunto.
##### El cliente nos han solicitado obtener la cantidad de registros de cada conjunto de datos representado por las siguientes letras:
- ##### A: Registros de moviesDf que no tiene filas coincidentes con las tablas ratingsDf y tagsDf
- ##### B: Registros de ratingsDf que no tiene filas coincidentes con las tablas moviesDf y tagsDf
- ##### C: Registros de tagsDf que no tiene filas coincidentes con las tablas moviesDf y ratingsDf
- ##### D: Registros de moviesDf y ratingsDf que no tiene filas coincidentes con la tabla tagsDf
- ##### E: Registros de moviesDf y tagsDf que no tiene filas coincidentes con la tabla ratingsDf
- ##### F: Registros de ratingsDf y tagsDf que no tiene filas coincidentes con la tabla moviesDf
- ##### G: Registros que contiene datos coincidentes en las tablas moviesDf, ratingsDf y tagsDf
-  Una tabla tiene registros coincidentes con otra cuando comparten el mismo valor en la columna "movie_id"

![title](../../resources/img/Tablas_conjuntos.png)

#### Actividad 1:
##### TO DO ->    Obtener los conjuntos de datos listados en el diagrama de Venn con operaciones de tipo join, con la finalidad de ahorrar recursos el cliente nos ha solicitado que en la salida de cada transformación el dataframe resultante contenga únicamente la columna "movie_id" (basta con utilizar únicante una trasnsformación select al inicio de las operaciones join y utilizar únicamente joins del tipo **left_semi** y **left_anti**).
- ##### Los dataframes resultantes se almacenarán en las siguientes variables:
    - ##### A_df -> Conjunto A
    - ##### B_df -> Conjunto B
    - ##### C_df -> Conjunto C
    - ##### D_df -> Conjunto D
    - ##### E_df -> Conjunto E
    - ##### F_df -> Conjunto F
    - ##### G_df -> Conjunto G

In [14]:
// Método para conjuntos A, B y C:
def operacionConjuntosCaso1(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val df1JoinDf2 = df1.select("movie_id").join(df2,Seq("movie_id"),"left_semi")
    val df1JoinDf3 = df1.select("movie_id").join(df3,Seq("movie_id"),"left_semi")
    val restaJoinsDf1jdf2Df1jdf3 = df1.select("movie_id")
        .join(df1JoinDf2,Seq("movie_id"),"left_anti")
        .join(df1JoinDf3,Seq("movie_id"),"left_anti")
    return restaJoinsDf1jdf2Df1jdf3
}

operacionConjuntosCaso1: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [15]:
val casoA = operacionConjuntosCaso1(moviesDf, ratingsDf, tagsDf)
casoA.show()
println(casoA.count())

+--------+
|movie_id|
+--------+
+--------+

0


casoA = [movie_id: string]


[movie_id: string]

In [16]:
// Método para conjuntos D, E y F:
def operacionConjuntosCaso2(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val df1_join_df2 = df1.select("movie_id").join(df2,Seq("movie_id"),"left_semi")
    val resta_joins_df1jdf2 = df1_join_df2.select("movie_id").join(df3,Seq("movie_id"),"left_anti")
    return resta_joins_df1jdf2
}

operacionConjuntosCaso2: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [17]:
// Caso C:
val casoC = operacionConjuntosCaso2(moviesDf, ratingsDf, tagsDf)
casoC.show(1)
println(casoC.count())

+--------+
|movie_id|
+--------+
|      51|
+--------+
only showing top 1 row

28815


casoC = [movie_id: string]


[movie_id: string]

In [20]:
// Método para conjunto G:
def operacionConjuntosCaso3(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = { 
    val df1_join_df2 = df1.select("movie_id").join(df2,Seq("movie_id"),"left_semi")
    val df1jdf2_join_df3 = df1_join_df2.select("movie_id").join(df3,Seq("movie_id"),"left_semi")
    return df1jdf2_join_df3
}

operacionConjuntosCaso3: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [22]:
val casoG = operacionConjuntosCaso3(moviesDf,ratingsDf,tagsDf)
casoG.show(1)
println(casoG.count())

+--------+
|movie_id|
+--------+
|       1|
+--------+
only showing top 1 row

47903


casoG = [movie_id: string]


[movie_id: string]

In [23]:
// TU CODIGO VA EN ESTA CELDA:

// Conjunto A:
val Adf = operacionConjuntosCaso1(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto B:
val Bdf = operacionConjuntosCaso1(ratingsDf, moviesDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto C:
val Cdf = operacionConjuntosCaso1(tagsDf, moviesDf, ratingsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto D:
val Ddf = operacionConjuntosCaso2(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto E:
val Edf = operacionConjuntosCaso2(moviesDf, tagsDf, ratingsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto F:
val Fdf = operacionConjuntosCaso2(ratingsDf, tagsDf, moviesDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto G:
val Gdf = operacionConjuntosCaso3(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf

Gdf = [movie_id: string]


Adf: org.apache.spark.sql.DataFrame = [movie_id: string]
Bdf: org.apache.spark.sql.DataFrame = [movie_id: string]
Cdf: org.apache.spark.sql.DataFrame = [movie_id: string]
Ddf: org.apache.spark.sql.DataFrame = [movie_id: string]
Edf: org.apache.spark.sql.DataFrame = [movie_id: string]
Fdf: org.apache.spark.sql.DataFrame = [movie_id: string]


[movie_id: string]

In [25]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
Bdf.show(1)
/*"""
Ejemplo de salida de cada dataframe (desde A hasta G):
+--------+
|movie_id|
+--------+
|  179995|
+--------+
only showing top 1 row
"""*/

+--------+
|movie_id|
+--------+
|  179995|
+--------+
only showing top 1 row



In [26]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

assert(Adf.count() == 0)
assert(Bdf.count() == 4270)
assert(Cdf.count() == 539)
assert(Ddf.count() == 28815)
assert(Edf.count() == 2759)
assert(Fdf.count() == 2251)
assert(Gdf.count() == 47903)

assert(Adf.columns.size == 1)
assert(Bdf.columns.size == 1)
assert(Cdf.columns.size == 1)
assert(Ddf.columns.size == 1)
assert(Edf.columns.size == 1)
assert(Fdf.columns.size == 1)
assert(Gdf.columns.size == 1)

assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))
assert(Adf.columns.toSeq.contains("movie_id"))

#### Actividad 2:
##### TO DO ->    Algunos administradores de base de datos no comprenden el uso de los joins de tipo left_semi y left_anti, por lo tanto el cliente ha solicitado que tambien se realicen las transformaciones con cualquiera de los siguientes tipos de join: **left, right, inner, outer**. Podrías utilizar la transformación filter/where en algunos casos.
##### No existe alguna reestricción de qué columnas contiene el dataframe de salida.
- ##### Los dataframes resultantes se almacenarán en las siguientes variables:
    - ##### A_df -> Conjunto A
    - ##### B_df -> Conjunto B
    - ##### C_df -> Conjunto C
    - ##### D_df -> Conjunto D
    - ##### E_df -> Conjunto E
    - ##### F_df -> Conjunto F
    - ##### G_df -> Conjunto G

In [46]:
// Operaciones de conjuntos pero sin usar left_semi ni left_anti:
// Caso A, B y C con join left:
def opConj1 (df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val primera_union = df1.join(df2,Seq("movie_id"),"left")
    val segunda_union = primera_union.join(df3,Seq("movie_id"),"left")
    val coincidencias_filtro = segunda_union.filter(df2("movie_id").isNull && df3("movie_id").isNull)
    return coincidencias_filtro
}

lastException: Throwable = null
opConj1: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [49]:
// Test A:
//val casoA2 = opConj1(moviesDf, ratingsDf, tagsDf)
//casoA2.show(1)
//println(casoA2.count())
val casoB2 = opConj1(ratingsDf, moviesDf, tagsDf)
casoB2.show(1)
println(casoB2.count())

+--------+----------+-------------+------------+-------------------+-----+------+----+---------+------------+
|movie_id|avg_rating|stddev_rating|count_rating|    min_time_rating|title|genres|year|tag_count|min_time_tag|
+--------+----------+-------------+------------+-------------------+-----+------+----+---------+------------+
|  179995|      3.65|         0.74|          17|2017-11-03 00:43:44| null|  null|null|     null|        null|
+--------+----------+-------------+------------+-------------------+-----+------+----+---------+------------+
only showing top 1 row

4270


casoB2 = [movie_id: string, avg_rating: double ... 8 more fields]


[movie_id: string, avg_rating: double ... 8 more fields]

In [52]:
// Caso D, E y F:
/* df.columns sería como df.columns.head, df.columns.tail: _* ya que df.columns regresa un arreglo de nombres de columnas,
necesitamos operar sólo con la primer columna y luego obtener el resto*/
def opConj2(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val union_uno = df1.join(df3,Seq("movie_id"),"left")
    val union_dos = df2.join(df3,Seq("movie_id"),"left")
    val filtro_uno = union_uno.filter(df3("movie_id").isNull).select(df1.columns.head, df1.columns.tail: _*)
    val filtro_dos = union_dos.filter(df3("movie_id").isNull).select(df2.columns.head, df2.columns.tail: _*)
    val union_final = filtro_uno.join(filtro_dos,Seq("movie_id"),"inner")
    return union_final
}

lastException: Throwable = null
opConj2: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [54]:
// Test:
val casoD2 = opConj2(moviesDf, ratingsDf, tagsDf)
//val casoE2 = opConj2(moviesDf, tagsDf, ratingsDf)
//val casoF2 = opConj2(ratingsDf, tagsDf, moviesDf)
casoD2.show(1)
println(casoD2.count())
//casoE2.show(1)
//println(casoE2.count())
//casoF2.show(1)
//println(casoF2.count())

+--------+--------------------+---------+----+----------+-------------+------------+-------------------+
|movie_id|               title|   genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|
+--------+--------------------+---------+----+----------+-------------+------------+-------------------+
|    1159|Love in Bloom (1935)|[Romance]|1935|      2.88|         1.05|          20|1996-10-11 04:28:21|
+--------+--------------------+---------+----+----------+-------------+------------+-------------------+
only showing top 1 row

28815


casoD2 = [movie_id: string, title: string ... 6 more fields]


[movie_id: string, title: string ... 6 more fields]

In [55]:
//Caso G - Registros que contiene datos coincidentes en las tablas movies_df, ratings_df y tags_df:
def opConj3(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val union_uno = df1.join(df2,Seq("movie_id"), "inner")
    val union_dos = union_uno.join(df3,Seq("movie_id"),"inner")
    return union_dos
}

opConj3: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [56]:
// TU CODIGO VA EN ESTA CELDA:

// Conjunto A:
val A_df = opConj1(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto B:
val B_df = opConj1(ratingsDf, moviesDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto C:
val C_df = opConj1(tagsDf, moviesDf, ratingsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto D:
val D_df = opConj2(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto E:
val E_df = opConj2(moviesDf, tagsDf, ratingsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto F:
val F_df = opConj2(ratingsDf, tagsDf, moviesDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf
// Conjunto G:
val G_df = opConj3(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf

G_df = [movie_id: string, title: string ... 8 more fields]


A_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 8 more fields]
B_df: org.apache.spark.sql.DataFrame = [movie_id: string, avg_rating: double ... 8 more fields]
C_df: org.apache.spark.sql.DataFrame = [movie_id: string, tag_count: array<struct<tag:string,count:bigint>> ... 8 more fields]
D_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 6 more fields]
E_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 4 more fields]
F_df: org.apache.spark.sql.DataFrame = [movie_id: string, avg_rating: double ... 5 more fields]


[movie_id: string, title: string ... 8 more fields]

In [57]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

assert(A_df.count() == 0)
assert(B_df.count() == 4270)
assert(C_df.count() == 539)
assert(D_df.count() == 28815)
assert(E_df.count() == 2759)
assert(F_df.count() == 2251)
assert(G_df.count() == 47903)

#### Actividad 3:
##### TO DO ->    Con operaciones join genera un dataframe que contenga la union de todos los conjuntos (desde el A hasta el G) sin repetir registros. No utilices las transformaciones de union.

In [59]:
// Definición de método:
def joinUniverso(df1: DataFrame, df2: DataFrame, df3: DataFrame): DataFrame = {
    val joinDf1Df2 = df1.join(df2,Seq("movie_id"),"outer")
    val join_DataFrames = joinDf1Df2.join(df3,Seq("movie_id"),"outer")
    return join_DataFrames
}

joinUniverso: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame, df3: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [60]:
// TEST:
val TESTval = joinUniverso(moviesDf, ratingsDf, tagsDf)
TESTval.show(1)
print(TESTval.count())

+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|movie_id|               title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|  100062|My Way (Mai Wei) ...|[Action, Drama, War]|2011|      3.63|         0.83|          64|2014-03-11 21:23:33|[{FATE, 1}, {PRES...|2018-05-26 16:40:54|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
only showing top 1 row

86537

TESTval = [movie_id: string, title: string ... 8 more fields]


[movie_id: string, title: string ... 8 more fields]

In [61]:
// TU CODIGO VA EN ESTA CELDA:

val universeDf = joinUniverso(moviesDf, ratingsDf, tagsDf) // aplicar transformaciones join a moviesDf, ratingsDf y tagsDf

universeDf = [movie_id: string, title: string ... 8 more fields]


[movie_id: string, title: string ... 8 more fields]

In [62]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
universeDf.show(1)
"""
Ejemplo de salida:
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|movie_id|               title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|  100062|My Way (Mai Wei) ...|[Action, Drama, War]|2011|      3.63|         0.83|          64|2014-03-11 21:23:33|[{PRESS-GANGED, 1...|2018-05-26 16:40:54|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
only showing top 1 row
"""

+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|movie_id|               title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|  100062|My Way (Mai Wei) ...|[Action, Drama, War]|2011|      3.63|         0.83|          64|2014-03-11 21:23:33|[{FATE, 1}, {PRES...|2018-05-26 16:40:54|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
only showing top 1 row



"
Ejemplo de salida:
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
movie_id|               title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
  100062|My Way (Mai Wei) ...|[Action, Drama, War]|2011|      3.63|         0.83|          64|2014-03-11 21:23:33|[{PRESS-GANGED, 1...|2018-05-26 16:40:54|
+--------+--------------------+--------------------+----+----------+-------------+------------+-------------------+------------...


In [68]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
import java.sql.Timestamp

val data = Seq(Row("100062",
                   "My Way (Mai Wei) (2011)",
                   Seq("Action", "Drama", "War"),
                   2011,
                   3.63,
                   0.83,
                   64L,
                   ("2014-03-11 21:23:33.0"),
                   Seq(Row("FATE",1L),
                       Row("PRESS-GANGED",1L),
                       Row("WAR",1L),
                       Row("WORLD WAR II",1L)),
                   ("2018-05-26 16:40:54.0")))
val schema = t.StructType(Seq(
    t.StructField("movie_id", t.StringType),
    t.StructField("title", t.StringType),
    t.StructField("genres", t.ArrayType(t.StringType)),
    t.StructField("year", t.IntegerType),
    t.StructField("avg_rating", t.DoubleType),
    t.StructField("stddev_rating", t.DoubleType),
    t.StructField("count_rating", t.LongType),
    t.StructField("min_time_rating", t.StringType),
    t.StructField("tag_count", t.ArrayType(t.StructType(Seq(
        t.StructField("tag", t.StringType),
        t.StructField("count", t.LongType)
    )))),
    t.StructField("min_time_tag", t.StringType)
    ))
val testDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    .withColumn("min_time_rating", f.col("min_time_rating").cast(t.TimestampType))
    .withColumn("min_time_tag", f.col("min_time_tag").cast(t.TimestampType))

val expectedColumns = Seq("movie_id", "title", "genres", "year",
                          "avg_rating", "stddev_rating", "count_rating",
                          "min_time_rating", "tag_count", "min_time_tag")

assert(universeDf.count() == 86537)
assert(universeDf.columns.diff(expectedColumns).size + expectedColumns.diff(universeDf.columns).size == 0)
assert(universeDf
    .select(expectedColumns.map(f.col):_*)
    .filter(f.col("movie_id") === "100062")
    .except(testDf).count() == 0)

schema = StructType(StructField(movie_id,StringType,true),StructField(title,StringType,true),StructField(genres,ArrayType(StringType,true),true),StructField(year,IntegerType,true),StructField(avg_rating,DoubleType,true),StructField(stddev_rating,DoubleType,true),StructField(count_rating,LongType,true),StructField(min_time_rating,StringType,true),StructField(tag_count,ArrayType(StructType(StructField(tag,StringType,true),StructField(count,LongType,true)),true),true),StructField(min_time_tag,StringType,...


lastException: Throwable = null
data: Seq[org.apache.spark.sql.Row] = List([100062,My Way (Mai Wei) (2011),List(Action, Drama, War),2011,3.63,0.83,64,2014-03-11 21:23:33.0,List([FATE,1], [PRESS-GANGED,1], [WAR,1], [WORLD WAR II,1]),2018-05-26 16:40:54.0])


StructType(StructField(movie_id,StringType,true),StructField(title,StringType,true),StructField(genres,ArrayType(StringType,true),true),StructField(year,IntegerType,true),StructField(avg_rating,DoubleType,true),StructField(stddev_rating,DoubleType,true),StructField(count_rating,LongType,true),StructField(min_time_rating,StringType,true),StructField(tag_count,ArrayType(StructType(StructField(tag,StringType,true),StructField(count,LongType,true)),true),true),StructField(min_time_tag,StringType,...

#### Actividad 4:
##### TO DO ->    La estructura del dataframe final de acuerdo al análisis realizado por marketing es el dado por la union del conjunto de datos: A, D, E y G. Realiza este proceso y almacena el dataframe resultante en la variable "finalDf"
##### 

In [71]:
// Definiciones:
val A_df = opConj1(moviesDf, ratingsDf, tagsDf)
val D_df = opConj2(moviesDf, ratingsDf, tagsDf)
val E_df = opConj2(moviesDf, tagsDf, ratingsDf)
val G_df = opConj3(moviesDf, ratingsDf, tagsDf)

G_df = [movie_id: string, title: string ... 8 more fields]


A_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 8 more fields]
D_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 6 more fields]
E_df: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 4 more fields]


[movie_id: string, title: string ... 8 more fields]

In [78]:
// TU CODIGO VA EN ESTA CELDA:
// aplicar transformaciones join a movies_df, ratings_df y tags_df
val unionUno = A_df.unionByName(D_df, allowMissingColumns  = true)
val unionDos = unionUno.unionByName(E_df, allowMissingColumns  = true)
val unionTres = unionDos.unionByName(G_df, allowMissingColumns  = true)
val finalDf = unionTres

finalDf = [movie_id: string, title: string ... 8 more fields]


unionUno: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movie_id: string, title: string ... 8 more fields]
unionDos: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movie_id: string, title: string ... 8 more fields]
unionTres: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [movie_id: string, title: string ... 8 more fields]


[movie_id: string, title: string ... 8 more fields]

In [79]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
finalDf.show(1)
"""
Ejemplo de salida:
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|movie_id|           title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
|       1|Toy Story (1995)|[Adventure, Anima...|1995|      3.89|         0.93|       76813|1996-01-28 18:00:00|[{TIME TRAVEL, 11...|2006-01-12 19:19:35|
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
only showing top 1 row
"""

+--------+--------------------+---------+----+----------+-------------+------------+-------------------+---------+------------+
|movie_id|               title|   genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|tag_count|min_time_tag|
+--------+--------------------+---------+----+----------+-------------+------------+-------------------+---------+------------+
|    1159|Love in Bloom (1935)|[Romance]|1935|      2.88|         1.05|          20|1996-10-11 04:28:21|     null|        null|
+--------+--------------------+---------+----+----------+-------------+------------+-------------------+---------+------------+
only showing top 1 row



"
Ejemplo de salida:
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
movie_id|           title|              genres|year|avg_rating|stddev_rating|count_rating|    min_time_rating|           tag_count|       min_time_tag|
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-------------------+
       1|Toy Story (1995)|[Adventure, Anima...|1995|      3.89|         0.93|       76813|1996-01-28 18:00:00|[{TIME TRAVEL, 11...|2006-01-12 19:19:35|
+--------+----------------+--------------------+----+----------+-------------+------------+-------------------+--------------------+-----------...


In [80]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

import java.sql.Timestamp

val data = Seq(Row("100062",
                   "My Way (Mai Wei) (2011)",
                   Seq("Action", "Drama", "War"),
                   2011,
                   3.63,
                   0.83,
                   64L,
                   ("2014-03-11 21:23:33.0"),
                   Seq(Row("FATE",1L),
                       Row("PRESS-GANGED",1L),
                       Row("WAR",1L),
                       Row("WORLD WAR II",1L)),
                   ("2018-05-26 16:40:54.0")))
val schema = t.StructType(Seq(
    t.StructField("movie_id", t.StringType),
    t.StructField("title", t.StringType),
    t.StructField("genres", t.ArrayType(t.StringType)),
    t.StructField("year", t.IntegerType),
    t.StructField("avg_rating", t.DoubleType),
    t.StructField("stddev_rating", t.DoubleType),
    t.StructField("count_rating", t.LongType),
    t.StructField("min_time_rating", t.StringType),
    t.StructField("tag_count", t.ArrayType(t.StructType(Seq(
        t.StructField("tag", t.StringType),
        t.StructField("count", t.LongType)
    )))),
    t.StructField("min_time_tag", t.StringType)
    ))
val testDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    .withColumn("min_time_rating", f.col("min_time_rating").cast(t.TimestampType))
    .withColumn("min_time_tag", f.col("min_time_tag").cast(t.TimestampType))

assert(finalDf.count() == 79477)
assert(finalDf.columns.diff(expectedColumns).size + expectedColumns.diff(finalDf.columns).size == 0)
assert(finalDf
    .select(expectedColumns.map(f.col):_*)
    .filter(f.col("movie_id") === "100062")
    .except(testDf).count() == 0)

schema = StructType(StructField(movie_id,StringType,true),StructField(title,StringType,true),StructField(genres,ArrayType(StringType,true),true),StructField(year,IntegerType,true),StructField(avg_rating,DoubleType,true),StructField(stddev_rating,DoubleType,true),StructField(count_rating,LongType,true),StructField(min_time_rating,StringType,true),StructField(tag_count,ArrayType(StructType(StructField(tag,StringType,true),StructField(count,LongType,true)),true),true),StructField(min_time_tag,StringType,...


data: Seq[org.apache.spark.sql.Row] = List([100062,My Way (Mai Wei) (2011),List(Action, Drama, War),2011,3.63,0.83,64,2014-03-11 21:23:33.0,List([FATE,1], [PRESS-GANGED,1], [WAR,1], [WORLD WAR II,1]),2018-05-26 16:40:54.0])


StructType(StructField(movie_id,StringType,true),StructField(title,StringType,true),StructField(genres,ArrayType(StringType,true),true),StructField(year,IntegerType,true),StructField(avg_rating,DoubleType,true),StructField(stddev_rating,DoubleType,true),StructField(count_rating,LongType,true),StructField(min_time_rating,StringType,true),StructField(tag_count,ArrayType(StructType(StructField(tag,StringType,true),StructField(count,LongType,true)),true),true),StructField(min_time_tag,StringType,...

#### Actividad 5:
##### TO DO ->    El cliente nos ha solicitado llenar los valores "null" de la columna "year", para esto nos ha pedido seguir la siguiente regla:
- ##### Si la columna "year" es null y si la columna "min_time_rating" es null colocar el año de la columna "min_time_tag".
- ##### Si la columna "year" es null y si la columna "min_time_tag" es null colocar el año de la columna "min_time_rating".
- ##### Si la columna "year" es null, y las columnas "min_time_rating" y "min_time_tag" son distintas de null colocar el año menor de las columnas "min_time_rating" y "min_time_tag", en caso de que el año en ambas columnas sea el mismo colocar dicho año.
- ##### En cualquier otro caso mantener el valor entero 1970
##### Adicional nos ha solicitado generar una columna llamada "year_type" con los siguientes valores:
- ##### Si la columna "year" venia con un valor distinto a null, asignar el valor "YO"
- ##### Si la columna "year" es null y si la columna "min_time_rating" es null colocar "YT"
- ##### Si la columna "year" es null y si la columna "min_time_tag" es null colocar "YR"
- ##### Si la columna "year" es null, y las columnas "min_time_rating" y "min_time_tag" son distintas de null:
    - ##### Si "min_time_rating" es menor a "min_time_tag" colocar "YR"
    - ##### Si "min_time_tag" es menor a "min_time_rating" colocar "YT"
    - ##### Si "min_time_tag" es igual a "min_time_rating" colocar "YRT"
- ##### En cualquier otro caso mantener "YU"
##### Al finalizar este proceso eliminar las columnas "min_time_rating" y "min_time_tag"
- Para resolver estos ejercicios podrías utilizar la función year -> https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.year.html#pyspark.sql.functions.year
##### Este proceso se desarrollará en un método con la firma: def getLastMoviesDf(df: DataFrame): DataFrame
##### La estructura del dataframe de salida será:
* |-- movie_id: string
* |-- title: string
* |-- avg_rating: double
* |-- count_rating: long
* |-- stddev_rating: double
* |-- genres: array
* |--- |-- element: string
* |-- tag_count: array
* |--- |-- element: struct
* |--- |--- |tag: string
* |--- |--- |count: long
* |-- year: integer
* |-- year_type: string
##### NO UTILIZAR withColumn NI withColumnRenamed NI UDF

In [87]:
// TU CODIGO VA EN ESTA CELDA:

def getLastMoviesDf(df: DataFrame): DataFrame = {
    val null_year = f.col("year").isNull
    val null_mt_rat = f.col("min_time_rating").isNull
    val null_mt_tag = f.col("min_time_tag").isNull
    val not_null_mt_rat = f.col("min_time_rating").isNotNull
    val not_null_mt_tag = f.col("min_time_tag").isNotNull
    val año_mttag = f.year(f.col("min_time_tag"))
    val año_mtrat = f.year(f.col("min_time_rating"))
    
    val year_nulo = df.filter(null_year)
        .select(
            f.col("movie_id"),
            f.col("title"),
            f.col("avg_rating"),
            f.col("count_rating"),
            f.col("stddev_rating"),
            f.col("genres"),
            f.col("tag_count"),
            f.when(null_mt_rat && not_null_mt_tag, año_mttag)
                .when(null_mt_tag && not_null_mt_rat, año_mtrat)
                .when(not_null_mt_rat && not_null_mt_tag, f.least(f.year(f.col("min_time_rating")), f.year(f.col("min_time_tag"))))
                .otherwise(1970).alias("year"),
            f.when(((not_null_mt_rat && not_null_mt_tag) && (f.year(f.col("min_time_rating")) < f.year(f.col("min_time_tag")))) || null_mt_tag, 
                   f.lit("YR").alias("year_type"))
                .when(((not_null_mt_rat && not_null_mt_tag) && (f.year(f.col("min_time_tag")) < f.year(f.col("min_time_rating")))) || null_mt_rat, 
                      f.lit("YT").alias("year_type"))
                .when(f.year(f.col("min_time_rating")) === f.year(f.col("min_time_tag")), f.lit("YRT").alias("year_type"))
                .otherwise(f.lit("YU")).alias("year_type")
        ).drop("min_time_rating", "min_time_tag")
    
    val year_no_nulo = df.filter(f.col("year").isNotNull)
        .select(
            f.col("movie_id"),
            f.col("title"),
            f.col("avg_rating"),
            f.col("count_rating"),
            f.col("stddev_rating"),
            f.col("genres"),
            f.col("tag_count"),
            f.col("year"),
            f.when(f.col("year").isNotNull, f.lit("YO").alias("year_type"))
        ).drop("min_time_rating", "min_time_tag")

    val resultDf = year_nulo.union(year_no_nulo)
    resultDf
}
    //df // transformaciones a finalDf

getLastMoviesDf: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [88]:
// TEST:
/* val moviesDf = getLastMoviesDf(finalDf)
moviesDf.where(f.col("movie_id") == 1).show(1,false)*/

Syntax Error.: 

In [89]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

val moviesDf = getLastMoviesDf(finalDf)
moviesDf.show(1)
"""
Ejemplo de salida esperada (el orden de la columnas podría ser distinto):
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+---------+
|movie_id|           title|avg_rating|count_rating|stddev_rating|              genres|           tag_count|year|year_type|
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+---------+
|       1|Toy Story (1995)|      3.89|       76813|         0.93|[Adventure, Anima...|[{1990S, 1}, {200...|1995|       YO|
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+---------+
only showing top 1 row
"""

+--------+------------+----------+------------+-------------+--------------------+---------+----+---------+
|movie_id|       title|avg_rating|count_rating|stddev_rating|              genres|tag_count|year|year_type|
+--------+------------+----------+------------+-------------+--------------------+---------+----+---------+
|  148093|The Republic|      2.33|           3|         0.62|[Action, Crime, T...|     null|2015|       YR|
+--------+------------+----------+------------+-------------+--------------------+---------+----+---------+
only showing top 1 row



moviesDf: org.apache.spark.sql.DataFrame = [movie_id: string, title: string ... 7 more fields]
"
Ejemplo de salida esperada (el orden de la columnas podr?a ser distinto):
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+---------+
movie_id|           title|avg_rating|count_rating|stddev_rating|              genres|           tag_count|year|year_type|
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+---------+
       1|Toy Story (1995)|      3.89|       76813|         0.93|[Adventure, Anima...|[{1990S, 1}, {200...|1995|       YO|
+--------+----------------+----------+------------+-------------+--------------------+--------------------+----+...


In [90]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA

import java.sql.Timestamp

val expectedRow = Row("179479",
                      "Samadhi Part 1: Maya, the Illusion of the Self",
                      4.19,
                      8,
                      0.75,
                      Seq("Documentary"),
                      Seq(Row("EASTERN PHILOSOPHY", 1),
                          Row("MEDITATION", 1),
                          Row("METAPHYSICAL", 1),
                          Row("NEW AGE", 1),
                          Row("SPIRITUAL", 1)),
                      2017,
                      "YT")

val expectedCountByYearType = Seq(Row("YO", 79235L),
                                  Row("YR", 159L),
                                  Row("YRT", 55L),
                                  Row("YT", 28L))

val expectedColumns = Seq("movie_id","title","avg_rating","count_rating","stddev_rating","genres","tag_count","year","year_type")

val expectedSchema = "movie_id STRING,title STRING,avg_rating DOUBLE,count_rating BIGINT,stddev_rating DOUBLE,genres ARRAY<STRING>,tag_count ARRAY<STRUCT<tag: STRING, count: BIGINT>>,year INT,year_type STRING"

assert(moviesDf.columns.diff(expectedColumns).size + expectedColumns.diff(moviesDf.columns).size == 0)
assert(moviesDf.filter(f.col("year").isNull).count() == 0)
assert(moviesDf
    .select(expectedColumns.map(f.col):_*)
    .filter(f.col("movie_id") === "179479")
    .collect()(0) == expectedRow)
assert(schema_to_ddl(moviesDf.select(expectedColumns.map(f.col):_*)) == expectedSchema)

moviesDf.groupBy(f.col("year_type")).count().orderBy(f.col("count").desc).collect()
    .zip(expectedCountByYearType)
    .foreach(tuple => {
        assert(tuple._1.getAs[String]("year_type") == tuple._2.getAs[String](0))
        assert(tuple._1.getAs[Long]("count") == tuple._2.getAs[Long](1))
    })

expectedSchema = movie_id STRING,title STRING,avg_rating DOUBLE,count_rating BIGINT,stddev_rating DOUBLE,genres ARRAY<STRING>,tag_count ARRAY<STRUCT<tag: STRING, count: BIGINT>>,year INT,year_type STRING


expectedRow: org.apache.spark.sql.Row = [179479,Samadhi Part 1: Maya, the Illusion of the Self,4.19,8,0.75,List(Documentary),List([EASTERN PHILOSOPHY,1], [MEDITATION,1], [METAPHYSICAL,1], [NEW AGE,1], [SPIRITUAL,1]),2017,YT]
expectedCountByYearType: Seq[org.apache.spark.sql.Row] = List([YO,79235], [YR,159], [YRT,55], [YT,28])
expectedColumns: Seq[String] = List(movie_id, title, avg_rating, count_rating, stddev_rating, genres, tag_count, year, year_type)


movie_id STRING,title STRING,avg_rating DOUBLE,count_rating BIGINT,stddev_rating DOUBLE,genres ARRAY<STRING>,tag_count ARRAY<STRUCT<tag: STRING, count: BIGINT>>,year INT,year_type STRING

In [91]:
// NO MODIFICAR EL CONTENIDO DE ESTA CELDA
val dfs = Seq((moviesDf, "08/movies"))

writeTmpDf(dfs)

dfs = List(([movie_id: string, title: string ... 7 more fields],08/movies))


List(([movie_id: string, title: string ... 7 more fields],08/movies))