## Canciones en Spotify entre 1958 y 2019

Spotify, en su conjunto, tiene una biblioteca de 30 millones de canciones. No solo contiene estas canciones, sino que también define y documenta los atributos de cada una de estas canciones (el tipo de atributos se aclarará a continuación). Esta es una cantidad increíble de información que requiere una arquitectura de Big Data para ser utilizada correctamente. Y es fundamental contar con esta arquitectura porque existe un inmenso potencial para la monetización de estos datos dentro de la industria de la música.

El conjunto de datos final que se debe utilizar en el análisis contiene 277.965 filas y 30 columnas para 24.000 canciones. La clave primaria de los datos es una combinación de WeekID y SongID.

* url: la URL de la canción en el sitio de la cartelera.
* WeekID: la semana para la que se registra el rendimiento de la canción
* Week Position: la posición que ocupó la canción durante esa semana (varía de 1 a 100)
* Song: título de la canción
* Performer: nombre del intérprete
* SongID: identificador de Billboard para la canción (concatenación de columnas anteriores)
* Instance: número de veces que se ha producido
* Previous Week Position: posición de la semana pasada
* Peak Position: la mejor posición que jamás haya ocupado
* Weeks on Chart: cuántas semanas ha estado en el top 100
* top10_tag: si la posición ocupada estaba dentro del top 10
* spotify_genre: género personalizado de Spotify (lista por canción)
* spotify_track_id: identificación alfanumérica
* spotify_track_preview_url: versión preliminar breve de la URL de la pista
* spotify_track_album – Álbum al que pertenece la canción
* spotify_track_explicit: registro de etiquetas si la pista contiene lenguaje explícito o no
* spotify_track_duration_ms: duración de la canción en milisegundos
* spotify_track_popularity: qué tan popular es la pista en Spotify
* danceability: métrica que define la probabilidad de que la canción se pueda bailar
* energy: Métrica que define qué tan enérgica es la canción
* key: tonalidad musical en la que se reproduce la canción
* loudness: métrica que define el volumen de la canción
* mode: pregrabación estéreo o mono
* speechiness: Métrica que define cuánto canto vs instrumento hay en la canción
* acousticness: métrica que define el grado en que se utilizan los instrumentos acústicos.
* instrumentalness: métrica que define cuánto de la canción utiliza instrumentos
* liveness: métrica que define el estado de ánimo de una canción
* valence: Métrica que define la positividad de la canción
* tempo: Métrica que define el tempo de la canción
* time_signature: métrica para etiquetar el tiempo de la canción

Luis Rodrigo Del Rincon Lagarde

# INSTRUCCIONES

En cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a `None`. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable).

**No olvides borrar la línea *raise NotImplementedError()* de cada celda cuando hayas completado la solución de esa celda y quieras probarla**.

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

*No olvides todas las sentencias import necesarias. Una celda que no pueda ser ejecutada por faltar import no será evaluada*

**Nunca se debe redondear ninguna cantidad si no lo pide explícitamente el enunciado**

### Cada solución debe escribirse obligatoriamente en la celda habilitada para ello. Cualquier celda adicional que se haya creado durante el desarrollo deberá ser eliminada.

Si necesitas crear celdas auxiliares durante el desarrollo, puedes hacerlo pero debes asegurarte de borrarlas antes de entregar el notebook.

### Sobre el dataset anterior (spotify_dataset.csv) se pide:

**Ejercicio 1 (1 punto)** Leerlo tratando de que Spark infiera el tipo de dato de cada columna, sin cachearlo.
* Puesto que existen columnas que contienen una coma enmedio del valor, en esos casos los valores vienen entre comillas dobles. Spark ya contempla esta posibilidad y puede leerlas adecuadamente **si al leer le indicamos las siguientes opciones adicionales** además de las que ya sueles usar: `.option("quote", "\"").option("escape", "\"")`.
* Asegúrate de que las **filas que no tienen el formato correcto sean descartadas**, indicando también la opción `mode` con el valor `DROPMALFORMED` como vimos en clase.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR LAS VARIABLES

spotify_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.option("mode", "DROPMALFORMED") \
.load("abfss://datos@masterml001sta.dfs.core.windows.net/spotify_dataset.csv")


In [None]:
assert(spotify_df.count() == 277965)
assert(spotify_df.is_cached == False)

**Ejercicio 2 (2 puntos)** Limpiar y enriquecer las variables aplicando las siguientes transformaciones encadenadas a `spotify_df`:
* Reemplazar la columna `WeekID` por su versión convertida a tipo fecha (DateType). Esto se hará en una sola línea:
  * Primero se convierte en una columna de Timestamp  con `F.to_date('nombreColumna', 'dd-MM-yyyy')`
  * En esa misma línea, dicha columna debe ser convertida a tipo DateType usando el método de la clase Column visto en clase.
* Crear una nueva columna `title_length` con la longitud (número de caracteres) del título de cada canción. PISTA: consulta la [documentación oficial](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.length.html) de la función `F.length`.
* Reemplazar la columna `Performer` por el resultado de *cortar cada título por el string* `" & "` (el carácter & precedido y seguido de un espacio en blanco), lo cual te devolverá una columna de vectores de string. PISTA: consulta la [documentación oficial](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.split.html) de la función `F.split`. En el caso de no haber ningún & el vector devuelto tendrá 1 elemento.
* Crear una nueva columna `n_performers` con el número de intérpretes de una canción. Para ello, debes *contar el número de elementos* de cada vector de la columna `Performer` y [la documentación](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.size.html) de la función `F.size` (ojo, no confundir con `length` utilizada antes). Ambas se pueden concatenar ya que actúan sobre un objeto Column y devuelven otro objeto Column. No te preocupes por los elementos nulos que hubiese en esta columna.
* Añadir dos columnas `year` y `month` que contengan respectivamente el año y el mes al que corresponde la fecha de la columna `WeekID`. PISTA: consulta la documentación oficial de [la función F.year](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.year.html) y de [la función F.month](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.month.html).
* Guardar el resultado de las tres transformaciones anteriores en una nueva variable `spotify_enriquecido_df` y **cachearla**.

* Guardar en la variable `fila_mas_autores` el objeto Row de la canción con el **mayor número de intérpretes**, deshaciendo los posibles empates en primer lugar por la que tenga la **menor longitud en el título** y, si sigue habiendo empates, por la que tenga la **semana más reciente** (cuidado: los tres criterios de ordenación son independientes y contrapuestos. Recuerda las funciones `asc()` y `desc()` de los objetos columna). PISTA: utilizar la acción `first()` tras ordenar el dataframe `spotify_enriquecido_df`, el cual no está ni debe quedar ordenado, puesto que la versión ordenada de dicho dataframe no debe reemplazar a dicha variable, sino que puedes usar otra variable auxiliar para guardarla y aplicar `first()` sobre ella.
* Accediendo a los campos del objeto anterior, almacenar en la variable `titulo_mas_autores` y `url_mas_autores` los campos del título de la canción y su URL, respectivamente.

In [None]:
from pyspark.sql import functions as F
spotify_enriquecido_df = spotify_enriquecido_df = spotify_df \
    .withColumn("WeekId", F.to_date(F.col("WeekID"), "dd/MM/yyyy"))\
    .withColumn("title_length", F.length("Song")) \
    .withColumn("Performer", F.split("Performer", " & ")) \
    .withColumn("n_performers", F.size("Performer")) \
    .withColumn("year", F.year("WeekID")) \
    .withColumn("month", F.month("WeekID")) \
    .cache()
fila_mas_autores = spotify_enriquecido_df.orderBy(
    F.desc("n_performers"),
    F.asc("title_length"),
    F.desc("WeekId")
 ).first()
titulo_mas_autores = fila_mas_autores['Song'] if fila_mas_autores else None
url_mas_autores = fila_mas_autores['url'] if fila_mas_autores else None

In [None]:
from pyspark.sql import functions as F

tipos = dict(spotify_enriquecido_df.dtypes)
assert(tipos["title_length"] == "int" and
       tipos["Performer"] == "array<string>" and
       tipos["n_performers"] == "int")
assert(sum([1 for x in ["title_length", "n_performers", "year", "month"] if x in spotify_enriquecido_df.columns]) == 4)
assert(fila_mas_autores.n_performers == 3)
assert(fila_mas_autores.url == "http://www.billboard.com/charts/hot-100/1998-07-11")
r = spotify_enriquecido_df.select(F.mean("n_performers").alias("n_performers"),
                                  F.mean("title_length").alias("title_length")).first()
assert(round(r.n_performers, 2) == 1.07)
assert(round(r.title_length, 2) == 15.59)


**Ejercicio 3 (1.5 puntos)** Partiendo del DataFrame `spotify_enriquecido_df` se pide:

* Crear un nuevo DataFrame que tenga tantas filas como años distintos aparecen, y tantas columnas como meses del año más una (que es el año y debe estar a la izquierda del todo). Las columnas deben aparecer de izqda a dcha del 1 a 12 -recuerda que el método `select()` te las devuelve en el orden que se las pidas-. En cada casilla de dicho DF debe aparecer el *número de canciones* ***distintas*** que han estado durante ese mes en algún momento en el Top 10. Se deben contar canciones distintas atendiendo a la columna `SongID`.
  * PISTA: el punto de partida debe ser el DF formado solamente por aquellas semanas que están en el top10. A partir de ahí debes construir la tabla anterior. Recuerda qué función de `pyspark.sql.functions` te sirve para contar el número de elementos **distintos** de una columna o en este caso, de un grupo.
  * PISTA: examina los valores posibles de la columna `top10_tag` para deducir con cuáles debes quedarte como punto de partida.
* Rellenar los valores nulos del DataFrame resultante con 0.
* Tras rellenarlos, añadir una nueva columna `n_top10_promedio` que contenga para cada año (fila) el número medio (redondeado a 2 cifras decimales con la función `F.round`) de canciones distintas que hay en todos los meses de ese año.
  * PISTA: esta operación se realiza exclusivamente mediante operaciones aritméticas con columnas. No se necesita F.mean ni F.sum ni nada parecido. La función `F.round` se debe aplicar al objeto columna resultante de la operación aritmética.
* Las filas del DataFrame resultante deben quedar ordenadas de menor a mayor año.
* Guardar el resultado de las transformaciones anteriores en una nueva variable `distintos_top10_df.

In [None]:
from pyspark.sql import functions as F

top10_weeks_df = spotify_df.filter(spotify_df['top10_tag'] == "Top 10")

distinct_songs_df = top10_weeks_df.groupBy(F.year('WeekID').alias('year'), F.month('WeekID').alias('month'))\
    .agg(F.countDistinct('SongID').alias('distinct_songs'))
filled_df = distinct_songs_df.fillna(0)


yearly_avg_df = filled_df.groupBy('year')\
    .agg(F.round(F.avg('distinct_songs'), 2).alias('n_top10_promedio'))

result_df = yearly_avg_df.orderBy('year')

result_df.display()
filled_df.display()
yearly_avg_df.display()
distinct_songs_df.display()
distintos_top10_df = result_df
distintos_top10_df.display()


year,n_top10_promedio
1958,12.56
1959,21.25
1960,24.58
1961,24.67
1962,22.75
1963,23.5
1964,23.58
1965,24.67
1966,29.08
1967,26.58


year,month,distinct_songs
1967,8,31
1997,11,17
1990,7,23
1998,2,20
1987,10,26
1980,8,28
1977,5,39
1963,3,22
2012,10,26
1966,6,22


year,n_top10_promedio
1959,21.25
1990,25.67
1975,25.25
1977,26.83
2003,23.0
2007,21.67
2018,22.5
1974,24.67
2015,21.17
2006,24.5


year,month,distinct_songs
1967,8,31
1997,11,17
1990,7,23
1998,2,20
1987,10,26
1980,8,28
1977,5,39
1963,3,22
2012,10,26
1966,6,22


year,n_top10_promedio
1958,12.56
1959,21.25
1960,24.58
1961,24.67
1962,22.75
1963,23.5
1964,23.58
1965,24.67
1966,29.08
1967,26.58


In [None]:

lista = distintos_top10_df.take(5)
assert(lista[0].year == 1958 and lista[0]["12"] == 12 and lista[0].n_top10_promedio == 9.42)
assert(lista[2].year == 1960 and lista[2]["4"] == 21 and lista[2].n_top10_promedio == 24.58)

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m/databricks/spark/python/pyspark/sql/types.py:3090[0m, in [0;36mRow.__getitem__[0;34m(self, item)[0m
[1;32m   3087[0m [38;5;28;01mtry[39;00m:
[1;32m   3088[0m     [38;5;66;03m# it will be slow when it has many fields,[39;00m
[1;32m   3089[0m     [38;5;66;03m# but this will not be used in normal cases[39;00m
[0;32m-> 3090[0m     idx [38;5;241m=[39m [38;5;28mself[39m[38;5;241m.[39m__fields__[38;5;241m.[39mindex(item)
[1;32m   3091[0m     [38;5;28;01mreturn[39;00m [38;5;28msuper[39m(Row, [38;5;28mself[39m)[38;5;241m.[39m[38;5;21m__getitem__[39m(idx)

[0;31mValueError[0m: '12' is not in list

During handling of the above exception, another exception occurred:

[0;31mPySparkValueError[0m                         Traceback (most recent call last)
File [0;32m<command-3

**Ejercicio 4 (1.5 puntos)** Partiendo de nuevo del DataFrame almacenado en `spotify_enriquecido_df` se pide:

* Reemplazar la columna `top10_tag`, que actualmente son "Top 10" y "Other" (literalmente) por una versión recategorizada (usando `F.when`) cuyos valores sean los *números enteros* 1 y 0, respectivamente (es decir, Top 10 -> 1 y Other -> 0). No es necesario usar F.lit, aunque puedes hacerlo.
* Crear una nueva columna `semanas_top10` que contenga para cada canción el número **total** de semanas que ha estado en el Top 10 **en ese mismo año** al que corresponde la fila. Está prohibido utilizar JOIN. Debe resolverse obligatoriamente creando una ventana en la  variable `w` (ATENCIÓN: hay que contar para **cada canción** y **ese mismo año**).
  * PISTA: vas a necesitar la columna `top10_tag` que has recategorizado como enteros.
  * PISTA: recuerda importar la función Window de `pyspark.sql`.
* A continuación, seleccionar las columnas `SongID`, `Performer` y `semanas_top10` y quitar duplicados.
* Ordenar el DataFrame resultante descentemente en base a `semanas_top10` y desempatar descendentemente en base a `SongID` (Spark utilizará orden alfabético al ser de tipo string).
* Guardar el resultado de las transformaciones anteriores en la variable `semanas_top10_df`.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("SongID", "year")
semanas_top10_df = spotify_enriquecido_df.withColumn(
    "top10_tag", F.when(F.col("top10_tag") == "Top 10", 1).otherwise(0)
).withColumn(
    "semanas_top10", F.sum("top10_tag").over(w)
).select(
    "SongID", "Performer", "semanas_top10"
).distinct().orderBy(
    F.col("semanas_top10").desc(), F.col("SongID").desc()
)

semanas_top10_df.display()

SongID,Performer,semanas_top10
Bad GuyBillie Eilish,List(Billie Eilish),30
Uptown Funk!Mark Ronson Featuring Bruno Mars,List(Mark Ronson Featuring Bruno Mars),28
That's What I LikeBruno Mars,List(Bruno Mars),28
Girls Like YouMaroon 5 Featuring Cardi B,List(Maroon 5 Featuring Cardi B),27
Party Rock AnthemLMFAO Featuring Lauren Bennett & GoonRock,"List(LMFAO Featuring Lauren Bennett, GoonRock)",26
Old Town RoadLil Nas X Featuring Billy Ray Cyrus,List(Lil Nas X Featuring Billy Ray Cyrus),26
God's PlanDrake,List(Drake),26
Truly Madly DeeplySavage Garden,List(Savage Garden),25
Trap QueenFetty Wap,List(Fetty Wap),25
Lucid DreamsJuice WRLD,List(Juice WRLD),25


In [None]:
lista = semanas_top10_df.take(4)
assert(lista[0].semanas_top10 == 30 and "Bad Guy" in lista[0].SongID)
assert(lista[1].semanas_top10 == 28)
assert(lista[2].semanas_top10 == 28)
assert(lista[3].semanas_top10 == 27 and "Girls Like You" in lista[3].SongID)

**Ejercicio 5 (1 punto)** Partiendo del DataFrame almacenado en `semanas_top10_df`:

* Reemplazar la columna `Performer` por el resultado de *explotarla* utilizando la [función](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html) `F.explode(...)`, esto es,  desenrollar los vectores que hay en dicha columna, de manera que cada valor del vector pase a tener su propia fila completa. Por ejemplo si una casilla de esa columna contiene el vector \["Beyonce", "Pitbull"\] porque hayan colaborado juntos en alguna canción, la operación `F.explode` sobre esa columna dará lugar a dos filas en el DataFrame resultante, que serán idénticas entre sí en todas las columnas excepto en la columna `Performer`, donde una de las dos filas tendrá el valor "Beyonce" y la otra fila tendrá el valor "Pitbull". El DataFrame resultante tendrá probablemente bastantes más filas que el original.
* A continuación, encadenando una transformación aplicada al DF devuelto por el apartado anterior, crear un nuevo DataFrame con tantas filas como intérpretes distintos y dos columnas, que sean el nombre dintérprete y el número **total** de semanas que sus canciones han estado en el top 10, llamando a dicha columna `total_semanas_top10` (poniendo sobre la marcha en el momento de crearla).
* Ordenar dicho DataFrame descendentemente en base a `total_semanas_top10`, y guardar el resultado en una variable `top_performers_df`
* Utilizar una acción que lleve al Driver, a la variable `top_performers`, una lista de Python con el top 3 de estos artistas. ¿Te suenan...?

In [None]:
from pyspark.sql import functions as F
top_performers_df = semanas_top10_df \
    .withColumn("Performer", F.explode("Performer")) \
    .groupBy("Performer") \
    .agg(F.sum("semanas_top10").alias("total_semanas_top10")) \
    .orderBy(F.col("total_semanas_top10").desc())

top_performers = top_performers_df.limit(3).collect()


for performer in top_performers:
    print(performer)
top_performers_df.display()

Row(Performer='Mariah Carey', total_semanas_top10=208)
Row(Performer='Madonna', total_semanas_top10=205)
Row(Performer='The Beatles', total_semanas_top10=182)


Performer,total_semanas_top10
Mariah Carey,208
Madonna,205
The Beatles,182
Bruno Mars,157
Rihanna,155
Usher,145
Michael Jackson,141
Taylor Swift,140
Whitney Houston,133
Elton John,132


In [None]:
assert(top_performers_df.count() == 7992)
assert(top_performers[0].Performer == "Mariah Carey" and top_performers[0].total_semanas_top10 == 208)
assert(top_performers[1].Performer == "Madonna" and top_performers[1].total_semanas_top10 == 205)
assert(top_performers[2].Performer == "The Beatles" and top_performers[2].total_semanas_top10 == 182)

Este ejercicio y los dos siguientes plantean un algoritmo de **clustering (K-means)** sobre las canciones, basándonos solamente en los atributos numéricos que describen los parámetros de cada canción. Concretamente, usaremos solamente las siguientes 10 columnas:

* danceability: métrica que define la probabilidad de que la canción se pueda bailar
* energy: Métrica que define qué tan enérgica es la canción
* loudness: métrica que define el volumen de la canción
* mode: pregrabación estéreo o mono (tiene dos valores: 0 y 1)
* speechiness: Métrica que define cuánto canto vs instrumento hay en la canción
* acousticness: métrica que define el grado en que se utilizan los instrumentos acústicos.
* instrumentalness: métrica que define cuánto de la canción utiliza instrumentos
* liveness: métrica que define el estado de ánimo de una canción
* valence: Métrica que define la positividad de la canción
* tempo: Métrica que define el tempo de la canción

Puesto que K-means con distancia euclídea requiere que todas las variables estén aproximadamente en el mismo rango para que todas influyan por igual en la distancia, primero vamos a reescalar todas las variables anteriores mediante un MinMaxScaler de Spark. Eso significa que tras el reescalado, todas tendrán valores entre 0 y 1. Como el MinMaxScaler requiere una columna de tipo vector como entrada porque es capaz de reescalar de una sola vez muchas features, construiremos **antes del reescalado** una columna de tipo vector con las features originales. La salida del reescalado sigue siendo una columna de tipo vector, tal como exige el algoritmo K-means que entrenaremos después.

**Ejercicio 6 (1 punto)** Partiendo del DataFrame `spotify_enriquecido_df`, tal como fue leído del fichero CSV, se pide:

* Ejecutar el código dado para crear el DataFrame `spotify_numeric_df`. No debes resolver nada en este momento, sino dejar este código sin modificar.
* Crear en la variable `vector_assembler` un VectorAssembler que reciba como entradas (argumento `inputCols`) la lista `cluster_vars` que ya viene creada, y como salida cree una nueva columna llamada `feats_vectorizadas`.
* Crear en la variable `minmax_scaler` un [objeto `MinMaxScaler()` de Spark](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinMaxScaler.html) que esté configurado para utilizar como entrada la columna `feats_vectorizadas`, y que cree una nueva columna de tipo vector llamada `feats_reescaladas`.
  * **NOTA IMPORTANTE**:  `MinMaxScaler` recibe como única columna de entrada una columna de tipo vector, ya que es capaz de re-escalar de una sola vez (y de manera independiente) un conjunto de columnas (o mejor dicho, una columna de tipo vector que en realidad representa varias columnas independientes ya colapsadas). Por eso en este caso particular, la penúltima pieza del pipeline, que va justo antes del algoritmo K-Means, **NO** va a ser el `vector_assembler` como suele ser habitual, sino el `minmax_scaler`, y por eso el `vector_assembler` debe ir antes de dicha pieza, ya que es quien va a crear la columna de vectores que después recibirá el `minmax_scaler` como entrada.
  * NOTA: los parámetros de valor mínimo y máximo que debemos indicar al MinMaxScaler como extremos de la columna que va a crear nueva, vienen ya fijados por defecto a 0 y 1 respectivamente (es decir, cada columna se va a re-escalar, dando lugar a otra nueva que va entre 0 y 1). Esto es justo lo que queremos para K-means, así que los dejamos sin especificar para que tomen esos valores.
* Crear en la variable `kmeans`un [estimador K-Means](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.KMeans.html) con argumento `k=10` clusters, que reciba como entrada la columna `feats_reescaladas` creada en la etapa del vector assembler, y genere como salida (argumento `predictionCol`) una nueva columna llamada `cluster`, que haga como máximo 200 iteraciones (argumento `maxIter`) y que tenga semilla 12345 (argumento `seed`, imprescindible para que coincidan los resultados de los tests).
  * A priori desconocemos el número de clusters que necesitamos, que no tiene por qué coincidir con el número de géneros musicales ya que también estamos juntando datos de muchos años diferentes y esto también influye. Deberíamos hacer un análisis con regla del codo y tratar de interpretar los clusters para varios valores de k distintos hasta quedar contentos con un valor de k, pero para simplificar, omitiremos este proceso.

In [None]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.clustering import KMeans           # recuerda el import necesario para el algoritmo KMeans

cluster_vars = ["danceability", "energy", "loudness", "mode", "speechiness", "acousticness", "instrumentalness",
                "liveness", "valence", "tempo"]

spotify_numeric_df = spotify_enriquecido_df\
    .groupBy("SongID").agg(*[F.first(c).alias(c) for c in ["year", "spotify_track_duration_ms"] + cluster_vars])\
    .distinct()\
    .cache()
vector_assembler = VectorAssembler(inputCols=cluster_vars, outputCol="feats_vectorizadas")
minmax_scaler = MinMaxScaler(inputCol="feats_vectorizadas", outputCol="feats_reescaladas")
kmeans = KMeans(featuresCol="feats_reescaladas", predictionCol="cluster", k=10, maxIter=200, seed=12345)
spotify_numeric_df.count()


23751

In [None]:
assert(spotify_numeric_df.count() == 23751)
assert(len(spotify_numeric_df.columns) == 13)
assert(minmax_scaler.getInputCol() == "feats_vectorizadas")
assert(minmax_scaler.getOutputCol() == "feats_reescaladas")
assert(vector_assembler.getInputCols() == cluster_vars)
assert(kmeans.getK() == 10 and kmeans.getFeaturesCol() == "feats_reescaladas"
       and kmeans.getPredictionCol() == "cluster" and kmeans.getMaxIter() == 200
       and kmeans.getSeed() == 12345)

**Ejercicio 7 (2 puntos)**
* Crear un pipeline en la variable `pipeline` que tenga las tres etapas anteriores (el vector_assembler, el minmax_scaler y el kmeans) por ese orden.
* Entrenar el pipeline sobre el DF `spotify_numeric_df` y guardar el resultado (pipeline entrenado) en la variable `pipeline_model`.
* Aplicar el pipeline entrenado para transformar el DataFrame `spotify_numeric_df` y guardar el resultado en la variable `clustered_songs_df`, que debe quedarse **cacheada**.
* Analizar el resultado obtenido mostrando métricas agregadas de las variables dentro de cada cluster. Para ello, a partir de `clustered_songs_df`, crear en la variable `metricas_clusters_pd` un nuevo DataFrame con tantas filas como clusters (10) y que tenga 13 columnas que serán:
  * La columna más a la izquierda el cluster al que corresponde la fila, que es por lo que estamos agrupando.
  * La segunda será la mediana de la columna `year` en cada grupo. Dicha columna no se utilizó para el clustering pero existe en el DF `clustered_songs_df`
  * Las siguientes 10 columnas deben ser la mediana de cada una de las 10 variables numéricas utilizadas para el cluster (excepto para "mode" que debemos usar la media y no la mediana)
  * La última columna debe ser el recuento del número de filas de cada grupo, que debe renombrarse como `n`.
* Tras el cálculo, el DataFrame resultante debe pasarse a DataFrame de **pandas**, y eso es lo que debe guardarse en `metricas_clusters_pd`.
  * Se debe usar la **mediana** de cada columna en cada grupo. Puesto que la mediana no está disponible en la API de columnas en Spark 2.4 (se incluyó en versiones posteriores) pero sí que está disponible en la API de SQL puro, tenemos que utilizar para cada agregación la expresión `F.expr("percentile_approx(nombreColumna, 0.5)").alias(nombreColumna)` que devuelve un objeto Column con la agregación y ya renombrado para llamarse igual que la columna a la que le estamos aplicando la mediana.
  * En la columna `mode` se debe usar la **media** habitual, no la mediana, también renombrada para que se llame "mode" tras aplicar la agregación.
  * NOTA: estamos haciendo clustering, con lo que no hay ninguna división en train y test ni nada parecido! :-)

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vector_assembler, minmax_scaler, kmeans])

pipeline_model = pipeline.fit(spotify_numeric_df)

clustered_songs_df = pipeline_model.transform(spotify_numeric_df).cache()

metricas_clusters_df = clustered_songs_df.groupBy("cluster") \
    .agg(
        F.expr("percentile_approx(year, 0.5)").alias("year"),
        *[F.expr(f"percentile_approx({col}, 0.5)").alias(col) if col != "mode" else F.mean(col).alias(col) for col in cluster_vars],
        F.count("*").alias("n")
    ).orderBy("cluster")

metricas_clusters_pd = metricas_clusters_df.toPandas()

In [None]:
assert(metricas_clusters_pd.shape == (10, 13))
columnas = ["cluster", "year", "n"] + cluster_vars   # todas estas columnas tienen que existir en metricas_clusters_pd
assert(sum([1 for x in columnas if x in metricas_clusters_pd.columns]) == 13)