# Notebook de tareas del portfolio

El presente notebook tiene como fin el desarrollo de las tareas asignadas en el portfolio.

El portfolio es un documento vivo, en el cual se irán incrementando los ejercicios a medida que se vean las temáticas para resolverlo, por lo que debéis estar pendientes a dicho notebook.

# Ejercicio 1:

Revise los procedimientos que se hicieron en los notebooks:

- Conteo de palabras mediante creación de un [datos en rdd](https://github.com/ssanchezgoe/viu_big_data/blob/main/notebooks/s03_databricks_codigo_wordcount_rdd.dbc)
- Conteo de palabras mediante creación de un [datos en dataframe](https://github.com/ssanchezgoe/viu_big_data/blob/main/notebooks/s03_wordcount_datarame.dbc)
- Conteo de palabras mediante creación de un [datos en sql](https://github.com/ssanchezgoe/viu_big_data/blob/main/notebooks/s03_wordcount_sql.dbc) 

Nota: Para abrir estos notebooks, descárgelos y súbalos a databricks para poderlos abrir. Una vez en ellos, verás que se realiza un conteo sobre un archivo interno de databricks que está en la ruta:


```
/databricks-datasets/README.md
```

Realice el mismo ejercicio, pero, en lugar de usar el archivo `README.md`, realice los siguientes pasos:

- Suba un archivo de texto con un escrito cualquiera.
- Repita los procedimientos realizados en cada uno de los tres notebooks antes mencionados para realizar un conteo de palabras usando `RDD`, `DataFrame` y `datos en SQL`.
- Explicad cada una de las líneas usadas en los notebooks.

In [0]:
#Texto guardado en un archvo .txt

texto = "La creatividad es una herramienta poderosa que influye en todos los aspectos de nuestra vida cotidiana. No se limita únicamente al arte o la música, sino que se manifiesta en la forma en que resolvemos problemas, tomamos decisiones o nos comunicamos con los demás. Ser creativo no significa tener habilidades técnicas sobresalientes, sino encontrar nuevas maneras de ver el mundo, combinar ideas y afrontar los desafíos con una mente abierta.En el trabajo, la creatividad permite encontrar soluciones innovadoras, optimizar procesos y mejorar la productividad. En el ámbito personal, ayuda a expresar emociones, fortalecer relaciones y generar bienestar emocional. Incluso en tareas rutinarias, un toque de creatividad puede transformar lo monótono en algo más interesante y satisfactorio. Además, fomentar la creatividad contribuye al crecimiento personal. Nos anima a cuestionar, experimentar y aprender constantemente. A través de la imaginación, también conectamos con nuestra niñez, recordando que jugar, soñar y explorar son partes esenciales de nuestra humanidad. Por estas razones, cultivar la creatividad a diario leyendo, escribiendo, observando o simplemente permitiéndonos pensar diferente— es un acto de autodescubrimiento y transformación constante"

path = "/FileStore/tables/documento_big_data.txt"

#escribimos el archivo usando dbutils
dbutils.fs.put(path, texto, overwrite = True)

print("Archivo guardado en :", path)


Wrote 1280 bytes.
Archivo guardado en : /FileStore/tables/documento_big_data.txt


In [0]:
%fs ls dbfs:/FileStore/

path,name,size,modificationTime
dbfs:/FileStore/prueba.txt,prueba.txt,814,1748370038000
dbfs:/FileStore/tables/,tables/,0,0


#### Importamos los datos y procesamos las las primeras lineas de nuestro archivo creado:

In [0]:
#Leemos los datos; necesito importar los datos a un archivo de texto. 
lineas = sc.textFile("/FileStore/tables/documento_big_data.txt")
#Mostramos las primeras diez filas ´
for linea in lineas.take(10):
    print(linea)

La creatividad es una herramienta poderosa que influye en todos los aspectos de nuestra vida cotidiana. No se limita únicamente al arte o la música, sino que se manifiesta en la forma en que resolvemos problemas, tomamos decisiones o nos comunicamos con los demás. Ser creativo no significa tener habilidades técnicas sobresalientes, sino encontrar nuevas maneras de ver el mundo, combinar ideas y afrontar los desafíos con una mente abierta.En el trabajo, la creatividad permite encontrar soluciones innovadoras, optimizar procesos y mejorar la productividad. En el ámbito personal, ayuda a expresar emociones, fortalecer relaciones y generar bienestar emocional. Incluso en tareas rutinarias, un toque de creatividad puede transformar lo monótono en algo más interesante y satisfactorio. Además, fomentar la creatividad contribuye al crecimiento personal. Nos anima a cuestionar, experimentar y aprender constantemente. A través de la imaginación, también conectamos con nuestra niñez, recordando q

#### Conteo de palabras en RDDs (Resilient Dsitributed Dataset)


In [0]:
#Empleamos las funciones de orden superior flatMap y map para contar las palabras
#y también haremos uso de las funciones lambda:

#Separamos las palabras por comillas ""
words = lineas.flatMap(lambda s: s.split(" "))

#Aplicamos la función map para asignarle a cada palabra el entero 1
word_tuples = words.map(lambda s: (s, 1))

#Tomamos las tuplas de palabras ya creadas y las agrupamos por la clave mediante reduceByKey, para posteriormenete aplicar la función lambda y contar cuantas veces aparece la palabra e ir de forma recursiva sumando los enteros asociados a cada palabra
word_count = word_tuples.reduceByKey(lambda x, y:  x + y)

#Tomamos las primeras diez palabras  
word_count.take(10)

#Gauradamos el archivo de las palabras contadas 
word_count.saveAsTextFile("/tmp/wordcount_nuevo.txt")

# Mostrar las primeras 10 palabras con su conteo
for word, count in word_count.take(30):
    print(f"{word}: {count}")

que: 4
influye: 1
en: 5
todos: 1
los: 3
aspectos: 1
vida: 1
limita: 1
únicamente: 1
al: 2
arte: 1
manifiesta: 1
problemas,: 1
decisiones: 1
comunicamos: 1
con: 3
Ser: 1
no: 1
técnicas: 1
sobresalientes,: 1
el: 3
y: 7
mente: 1
soluciones: 1
innovadoras,: 1
procesos: 1
mejorar: 1
productividad.: 1
En: 1
ámbito: 1


#### Conteo de palabras en Dataframes 

In [0]:
#Importamos las funciones que vamos a emplear:
from pyspark.sql.functions import split, explode

#Leemos el archivo que contiene el texto
linesDf = spark.read.text("/FileStore/tables/documento_big_data.txt")

#Separamos cada línea en palabras las cuales estarán entre comillas:
lista_palabras_Df = linesDf.select(split("value", " ").alias("words"))

#Convertimos las listas de palabras en filas individuales:
palabras_Df = lista_palabras_Df.select(explode("words").alias("word"))

#Aplicamos la función count() para relizar el conteo de palabras que han sido agrupadas aplicando groupBy():
conteo_palabra_Df = palabras_Df.groupBy("word").count()

#Mostramos el conteo:
conteo_palabra_Df.show()

#Lo reescribimos todo en un archivo nuevo y lo guardamos como un csv:
conteo_palabra_Df.write.csv("/tmp/wordcounts_dataframes.csv")

+---------------+-----+
|           word|count|
+---------------+-----+
|            una|    2|
|             en|    5|
|        música,|    1|
|      bienestar|    1|
|     decisiones|    1|
|            Ser|    1|
|   experimentar|    1|
|    simplemente|    1|
|sobresalientes,|    1|
|        generar|    1|
|      optimizar|    1|
|          puede|    1|
|   innovadoras,|    1|
|          soñar|    1|
|           vida|    1|
|            ver|    1|
|         partes|    1|
|     abierta.En|    1|
|      constante|    1|
|             En|    1|
+---------------+-----+
only showing top 20 rows



In [0]:
#Mostramos los resultados del conteo de palabras en Dataframes:
conteo_palabra_Df.display()

word,count
una,2
en,5
"música,",1
bienestar,1
decisiones,1
Ser,1
experimentar,1
simplemente,1
"sobresalientes,",1
generar,1


#### Conteo de palabras con SQL

In [0]:
%sql
DROP TABLE IF EXISTS word_counts;                   -- Eliminamos la columan de conteo de palabras si existe
CREATE TABLE word_counts (word STRING)              -- Creamos la columna de conteo de palabras con las palabras
USING csv
OPTIONS("delimiter"=" ")                            -- Definimos los delimitaddores de las palabras como los espacios entre ellas 
LOCATION "/FileStore/tables/documento_big_data.txt" -- Indicamos al dirección del archivo 

In [0]:
%sql 
SELECT word, COUNT(word) AS count
FROM word_counts
GROUP BY word

word,count
La,1


Opción 2: 

In [0]:
#Leemos el archivo como texto plano 
file_location = "/FileStore/tables/documento_big_data.txt"
lines_df = spark.read.text(file_location)

#Dividimos las lineas en palabras 
from pyspark.sql.functions import explode, split

words_df = lines_df.select(
    explode(
        split(lines_df.value, "\\s+")  # Divide por espacios en blanco
    ).alias("word")
)

#Creación de la tabla temporal de SQL
words_df.createOrReplaceTempView("words")


In [0]:
word_count = spark.sql("""
    SELECT word, COUNT(*) AS count
    FROM words
    WHERE word != ''
    GROUP BY word
    ORDER BY count DESC
""")

In [0]:
display(word_count)

word,count
y,7
la,7
de,6
en,5
creatividad,5
que,4
nuestra,3
o,3
con,3
el,3


# Ejercicio 2:

Ejercicio de DataFrames de Spark.

Vamos a practicar con Spark DataFrames respondiendo preguntas sobre datos de las acciones de Walmart entre 2012 y 2017. 

Completa las tareas que se indican a continuación.



## Carga de datos

Carga el archivo CSV de las acciones de [Walmart](https://github.com/ssanchezgoe/viu_big_data/blob/main/data/walmart_stock.csv) y permite que Spark infiera automáticamente los tipos de datos.

In [0]:
df = spark.read.csv("/FileStore/tables/walmart_stock.csv", header=True, inferSchema=True)

#InferSchema = True para poder inferir sobre el esquema:
df.printSchema()
df.show(10)

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|   

## Nombre columnas

Imprimir los nombres de las columnas del dataframe.

In [0]:
df.columns

Out[20]: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

## Esquema del dataframe
Imprimir el esquema del dataframe

In [0]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



## Visualización de columnas
Imprimir las 5 primeras columnas

In [0]:
columnas = df.columns[:5] #Imprimo las cinco primeras columnas y las guado en una variable llamada columnas.
df.select(columnas).show()

+----------+------------------+------------------+------------------+------------------+
|      Date|              Open|              High|               Low|             Close|
+----------+------------------+------------------+------------------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18|
|2012-01-10|             59.43|59.709998999999996|             58.98|59.040001000000004|
|2012-01-11|         59.060001|         59.529999|59.040001000000004|         59.400002|
|2012-01-12|59.790001000000004|              60.0|         59.400002|              59.5|
|2012-01-13|         

## Análisis estadístico. 
Use el método `describe()` para explorar información estadística del dataframe.

In [0]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

## Pregunta Extra
Hay demasiados decimales en la media y la desviación estándar del DataFrame generado por `describe()`. Formatea los números para que solo muestren dos decimales. Presta mucha atención a los tipos de datos que devuelve `.describe()`. No vimos exactamente cómo hacer este formato, pero sí algo muy similar.

In [0]:
#Para este apartado emplearemos la funcion format_number y col del paquete SQL de PySpark.

#Comenzamos viendo un resumen estadístico de los datos:
descripcion = df.describe()

#Creamos una variable con todas las columnas:
#columnas = df.columns[1:]
#df.columns
#df.describe().dtypes
#Vemos que los datos de las columnas de tipo string y necesito columnas numéricas:

#Importamos las funciones col y format_number del paquete sql de PySpark:
from pyspark.sql.functions import col, format_number

# Convertimos las columnas numéricas en el DataFrame original:
columnas_conversion = df.columns[1:]
for columna in columnas_conversion:
    df = df.withColumn(columna, col(columna).cast("double"))

#Implementamos mediante un bucle for  para asignar un formato de dos decimales a cada una de las medidas estadísticas mostradas en describe():
for columna in columnas_conversion:
    descripcion = descripcion.withColumn(columna, format_number(col(columna).cast("double"), 2))

# Mostramos el resultado:
descripcion.show()

+-------+--------+--------+--------+--------+-------------+---------+
|summary|    Open|    High|     Low|   Close|       Volume|Adj Close|
+-------+--------+--------+--------+--------+-------------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.48|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780.84|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100.00|    84.91|
+-------+--------+--------+--------+--------+-------------+---------+



## Creación de un nuevo DataFrame

Crea un nuevo DataFrame con una columna llamada HV Ratio, que sea el resultado de dividir el precio más alto (High Price) entre el volumen de acciones negociadas en un día.


In [0]:
#Importamos las funciones necesarias de PySpark SQL:
from pyspark.sql.functions import col, max

#Obtenemos el valor máximo de la columna 'High' por día, para ello aplicamos una función de orden superior (GrupBy) para agrupar los datos del mismo día:
maximo_high_dia = df.groupBy("Date").agg(max("High").alias("max_high")).collect()[0]["max_high"]

#Creamos la nueva columna 'HV Ratio' dividiendo ese valor máximo entre 'Volume' de cada fila:
df_hv_ratio = df.withColumn("HV Ratio", maximo_high_dia / col("Volume"))

# Mostramos las primeras filas con la nueva columna:
df_hv_ratio.select("High", "Volume", "HV Ratio").show(5)

df_hv_ratio.display(3)

#El resultado será:
df_hv_ratio_2 = df.withColumn("HV Ratio", col("High") / col("Volume"))
df_hv_ratio_2.display(3)

+---------+---------+--------------------+
|     High|   Volume|            HV Ratio|
+---------+---------+--------------------+
|61.060001|1.26688E7|4.889965979413993E-6|
|60.349998|9593300.0|6.457631993161894E-6|
|59.619999|1.27682E7|4.851897761626541E-6|
|59.450001|8069400.0|7.677150841450417E-6|
|59.549999|6679300.0|9.274924168700314E-6|
+---------+---------+--------------------+
only showing top 5 rows



Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio
2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800.0,52.619235,4.889965979413993e-06
2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300.0,52.078475,6.457631993161894e-06
2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200.0,51.825539,4.851897761626541e-06
2012-01-06,59.419998,59.450001,58.869999,59.0,8069400.0,51.45922,7.677150841450417e-06
2012-01-09,59.029999,59.549999,58.919998,59.18,6679300.0,51.616215,9.274924168700314e-06
2012-01-10,59.43,59.709999,58.98,59.040001,6907300.0,51.494109,8.968772313349645e-06
2012-01-11,59.060001,59.529999,59.040001,59.400002,6365600.0,51.808098,9.731997140882244e-06
2012-01-12,59.790001,60.0,59.400002,59.5,7236400.0,51.895316,8.560886766900669e-06
2012-01-13,59.18,59.610001,59.009998,59.540001,7729300.0,51.930204,8.014956205607234e-06
2012-01-17,59.869999,60.110001,59.52,59.849998,8500000.0,52.200581,7.288235411764706e-06


Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio
2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800.0,52.619235,4.819714653321546e-06
2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300.0,52.078475,6.290848613094555e-06
2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200.0,51.825539,4.669412994783916e-06
2012-01-06,59.419998,59.450001,58.869999,59.0,8069400.0,51.45922,7.367338463826307e-06
2012-01-09,59.029999,59.549999,58.919998,59.18,6679300.0,51.616215,8.915604778943901e-06
2012-01-10,59.43,59.709999,58.98,59.040001,6907300.0,51.494109,8.644477436914568e-06
2012-01-11,59.060001,59.529999,59.040001,59.400002,6365600.0,51.808098,9.351828421515644e-06
2012-01-12,59.790001,60.0,59.400002,59.5,7236400.0,51.895316,8.29141562102703e-06
2012-01-13,59.18,59.610001,59.009998,59.540001,7729300.0,51.930204,7.712212102001476e-06
2012-01-17,59.869999,60.110001,59.52,59.849998,8500000.0,52.200581,7.071764823529412e-06


## Precio máximo

¿Qué día tuvo el precio máximo más alto?


In [0]:
#Podemos implementar dos tipos de código:

#Primera opción:

#Ordenar los datos de forma descendente empezando por el mayor y tomar el primero:
dia_mayor_precio_1 = df.orderBy(col("High").desc()).select("Date", "High").first()
print(dia_mayor_precio_1)

#Segunda opción:

#Implementamos las funciones max y filter:
from pyspark.sql.functions import max

# Obtenemos el valor máximo de High:
mayor_precio = df.agg(max("High").alias("mayor_precio")).collect()[0]["mayor_precio"]

# Filtramos filas donde High sea igual al valor máximo:
df.filter(col("High") == mayor_precio).select("Date", "High").show()


Row(Date=datetime.date(2015, 1, 13), High=90.970001)
+----------+---------+
|      Date|     High|
+----------+---------+
|2015-01-13|90.970001|
+----------+---------+



## Valor medio

¿Cuál es la media de la columna Close?


In [0]:
#Cargamos comandos de SQL entre los que se encuentran average:
from pyspark.sql.functions import avg

#Obtenemos la media de la columna "Close" aplicado la función importada avg:
df.select(avg("Close").alias("Media_columna_Close")).show()

#Podría hacerlo sumando todos los valores de la columna y dividiéndolo entre el núemro de elementos totales, pero sería más costoso computacionalmente.

+-------------------+
|Media_columna_Close|
+-------------------+
|  72.38844998012726|
+-------------------+



## Valores máximos y mínimos

¿Cuál es el valor máximo y el mínimo de la columna Volume?


In [0]:
#Importamos las fuciones max y min ppara calcular el valor máximo y mínimo de una columna:
from pyspark.sql.functions import max, min
#Implementamos las funciones sobre la columna volumen:
df.select(
    max("Volume").alias("Maximo_Volumen"),
    min("Volume").alias("Minimo_Volumen")
).show()

#También podríamos ordenar la columna con la función OrderBy y después tomar el primer y el último elemento, pero es más costoso como ocurría antes, además solo queremos esos dos valores.

+--------------+--------------+
|Maximo_Volumen|Minimo_Volumen|
+--------------+--------------+
|     8.08981E7|     2094900.0|
+--------------+--------------+



## Conteo de días

¿Cuántos días el valor de Close fue inferior a 60 dólares?


In [0]:
#Empleamos la función filter() para seleccionar los datso en función de una consición impuesta por el enunciado y count() para sumar cuantos datos cumplen esa conddición, es decir el numero de elementos que se han obtenido del filter:
dias_inferiores_60 = df.filter(df["Close"] < 60).count()

# Mostramos el resultado:
print(f"Número de días que Close fue inferior a 60 dólares: {dias_inferiores_60}")

Número de días que Close fue inferior a 60 dólares: 81


## Determinación porcentaje

¿Qué porcentaje del tiempo el valor de High fue superior a 80 dólares?

En otras palabras, (Número de días con High > 80) / (Total de días en el conjunto de datos)


In [0]:
#Obtenemos el número de días con valor de la columna "High" superior a 80 dólares:
dias_mayores_80 = df.filter(df["High"] > 80).count()

#El total de días se corresponde con el número de filas, por lo tanto servirá unicamente usar: 
total_dias = df.count()

#Calulamos el porcentaje:
porcentaje = (dias_mayores_80 / total_dias) * 100

#Número con dos decimales:
porcentaje_1 = round(porcentaje, 2)
print(f"El porcentaje de tiempo donde el valor de High fue mayor a 80 dólares fue de {porcentaje_1} %" )

El porcentaje de tiempo donde el valor de High fue mayor a 80 dólares fue de 9.14 %


## Correlación

¿Cuál es la correlación de Pearson entre High y Volume?

Consultar [ayuda
](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.stat.Correlation.html).


In [0]:
#Tras consultar en enlace de ayuda hemos visto como se puede calcular la matriz de correlación y el coeficiente de Pearson que mide como estudiamos en estadística la correlación entre dos variables:

from pyspark.sql.functions import corr


#Calculamos el coeficiente de Pearson, el cual es equivalente a una de los coeficientes de la matruiz de correlación. 
#pero con la función corr() podemos calcular la correlación de las variables que buscamos, para después seleccionar el valor con la función select. El formato es un float el cual obtenemos al implmentar la función collect()[0][0]


coef_pearson = df.select(corr("High", "Volume")).collect()[0][0]
print(f"La correlación de Pearson entre High y Volume es: {coef_pearson}")

La correlación de Pearson entre High y Volume es: -0.3384326061737161


## Valor máximo por año

¿Cuál fue el valor máximo de High por año?


In [0]:
#Importamos las fuciones year y max:
from pyspark.sql.functions import year, max

# Extraemos el año de la columna 'Date' y creamos una nueva columna df_year:
df_year = df.withColumn("Year", year(df["Date"]))

# Agrupamos por año implementando la función Groupby y calculamos el valor máximo de la columna "High":
max_high_per_year= df_year.groupBy("Year").agg(max("High").alias("Max_High"))

# Mostramos el resultado:
max_high_per_year.orderBy("Year").show()

+----+---------+
|Year| Max_High|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



## Promedio por mes

¿Cuál es el promedio del valor de Close para cada mes del calendario?

Es decir, considerando todos los años, ¿cuál es el precio promedio de Close para enero, febrero, marzo, etc.? El resultado debe tener un valor para cada uno de los meses.


In [0]:
#importamos las funciones moth y avg, aunque avg la importamos previamente:
from pyspark.sql.functions import month, avg

# Agregamos una columna con el mes extraído de la fecha:
df_con_mes = df.withColumn("Month", month(df["Date"]))

# Agrupamos por mes y calculamos el promedio del valor de la columna "Close":
promedio_close_por_mes = df_con_mes.groupBy("Month").agg(avg("Close").alias("Promedio_Close"))

# Mostramos el resultado ordenado por mes:
promedio_close_por_mes.orderBy("Month").show()

+-----+-----------------+
|Month|   Promedio_Close|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



# Ejercicio 3: Regresión Lineal. Proyecto de Modelado Predictivo: Estimación de Tripulación para Barcos de Crucero

Has sido contratado por Hyundai Heavy Industries, una de las mayores compañías de construcción naval del mundo, para colaborar en la creación de un modelo predictivo para sus barcos de crucero.

Te encuentras en la sede central de la empresa en Ulsan, Corea del Sur, con el objetivo de desarrollar un modelo que proporcione estimaciones precisas sobre la cantidad de tripulantes que necesitará cada barco.

Actualmente, están en proceso de construir nuevos barcos para varios clientes, y necesitan que crees un modelo que prediga cuántos miembros de la tripulación serán necesarios.

**Descripción del conjunto de datos**

El archivo con los datos se llama [cruise_ship_info.csv](https://github.com/ssanchezgoe/viu_big_data/blob/main/data/cruise_ship_info.csv). Contiene mediciones sobre el tamaño, capacidad, tripulación y antigüedad de 158 barcos de crucero.

**Variables/Columnas:**

- **Ship Name**: Nombre del barco (posiciones 1-20)
- **Cruise Line**: Línea de cruceros (posiciones 21-40)
- **Age (as of 2013)**: Edad del barco en años, tomando 2013 como referencia (posiciones 46-48)
- **Tonnage (1000s of tons)**: Tonelaje en miles de toneladas (posiciones 50-56)
- **Passengers (100s)**: Número de pasajeros en centenas (posiciones 58-64)
- **Length (100s of feet)**: Longitud en centenas de pies (posiciones 66-72)
- **Cabins (100s)**: Número de camarotes en centenas (posiciones 74-80)
- **Passenger Density**: Densidad de pasajeros (posiciones 82-88)
- **Crew (100s)**: Número de tripulantes en centenas (posiciones 90-96)

**Objetivo**

Desarrolla un modelo de regresión para predecir cuántos tripulantes se necesitarán en barcos futuros. Ten en cuenta que el cliente ha mencionado que la **línea de cruceros** puede influir significativamente en el número de trip


Vamos a extraer los datos del archivo "cruise_ship_info.csv" para proceder al preprocesamiento de los mismos antes de elaborar nuestro modelo de regresión lineal para predecir el número de miembros de la tripulación que formarán parte del barco.

In [0]:
# Cargamos los datos 
datos = spark.read.csv("/FileStore/tables/cruise_ship_info-6.csv", inferSchema=True,header=True)

Empleamos "inferSchema = True" para que Spark detecte automaticamente el tipo de dato de cada columna y agregamos la primera fila como nombre de las columnas con "header = True"

In [0]:
#A continuación imprimimos un esquema del DataFrame que forman los datos cargados previamente, para comprender los datos con los que estamos trabajando.
datos.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



- Aquí podemos ver el tipo de datos con los que estamos trabajando y además `nullable = True`, nos indica que todas las variables o features pueden contener valores nulos. 

- Además podemos ver que todas las columnas o features poseen el tipo `double` a excepción de las columnas `ship_name` y `cruise_line` que poseen datos de tipo `string` y `Age` que posee datos de tipo `integer`.

In [0]:
#Visaulizamos el dataFrame completo:
datos.display()

Ship_name,Cruise_line,Age,Tonnage,passengers,length,cabins,passenger_density,crew
Journey,Azamara,6,30.277,6.94,5.94,3.55,42.64,3.55
Quest,Azamara,6,30.277,6.94,5.94,3.55,42.64,3.55
Celebration,Carnival,26,47.262,14.86,7.22,7.43,31.8,6.7
Conquest,Carnival,11,110.0,29.74,9.53,14.88,36.99,19.1
Destiny,Carnival,17,101.353,26.42,8.92,13.21,38.36,10.0
Ecstasy,Carnival,22,70.367,20.52,8.55,10.2,34.29,9.2
Elation,Carnival,15,70.367,20.52,8.55,10.2,34.29,9.2
Fantasy,Carnival,23,70.367,20.56,8.55,10.22,34.23,9.2
Fascination,Carnival,19,70.367,20.52,8.55,10.2,34.29,9.2
Freedom,Carnival,6,110.239,37.0,9.51,14.87,29.79,11.5


Realizando una vista rápida de los datos con la función display() vemos que **no hay valores nulos o no definidos (Null)** que puedan causar problemas. Aún así vamos a verificarlo rápidamente y a mostrar los resultados.

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

# Contamos los valores nulos por columnas 
null_counts = datos.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in datos.columns])

# Imprimimos los resultados por columnas 
null_counts.show()

+---------+-----------+---+-------+----------+------+------+-----------------+----+
|Ship_name|Cruise_line|Age|Tonnage|passengers|length|cabins|passenger_density|crew|
+---------+-----------+---+-------+----------+------+------+-----------------+----+
|        0|          0|  0|      0|         0|     0|     0|                0|   0|
+---------+-----------+---+-------+----------+------+------+-----------------+----+



In [0]:
#Resumen estadístico de las columnas numéricas:
#Selecciono las columnas numéricas de mis datos, las cuales por ahora tienen tipo double o ineteger:
columnas_numericas = datos.select([c for c, t in datos.dtypes if t in ('integer', 'double')])
#Realizo el cálculo de los estadísticos básicos y los muestro:
columnas_numericas.describe().show()

+-------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|summary|           Tonnage|       passengers|           length|            cabins|passenger_density|             crew|
+-------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  count|               158|              158|              158|               158|              158|              158|
|   mean| 71.28467088607599|18.45740506329114|8.130632911392404| 8.830000000000005|39.90094936708861|7.794177215189873|
| stddev|37.229540025907866|9.677094775143416|1.793473548054825|4.4714172221480615| 8.63921711391542|3.503486564627034|
|    min|             2.329|             0.66|             2.79|              0.33|             17.7|             0.59|
|    max|             220.0|             54.0|            11.82|              27.0|            71.43|             21.0|
+-------+------------------+------------

Tras un rápido análisis de los datos de nuestro documento `cruise_ship_info.csv` vamos a  proceder a la preparación de los datos para aplicar un modelo de Machine Learning en Spark.


In [0]:
#Visualizamos las columnas del conjunto de datos
datos.columns

Out[49]: ['Ship_name',
 'Cruise_line',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew']

###Organizamos los datos en dos estructuras:

    X -> Variables independientes que nos servirán para predecir (features)
    y -> Variable dependiente que queremos predecir (label)
    
En nuesto modelo la variable a predecir será `crew` mientras que las variables que tendremos en cuenta para predecir el número de tripulantes serán todas a excepción de `Ship_name`, ya que el nombre del barco no aporta una información relevante sobre el número de tripulantes.


In [0]:
#Importamos las funciones que necesitamos usar para crear los vectores de features y label.

# Importamos las funciones VectorAssembler y Vectors:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#Definimos un vector de ensamble el cual estará formado por los features que se emplean para estimar la variable `crew`. 

#Excluimos la columna "Ship_name" y  "crew" para  definir las columnas de feautures:

#Para hacerlo definimos las columnas que vamos a excluir: 
col_exclusion = ["Ship_name", "crew"]
#Tomamos las columnas que no están contenidas en col_exclusion:
col_features = [col for col in datos.columns if col not in col_exclusion] 
print( f"las columnas correspondientes a las features serán", col_features)

#Antes de crear el vector de ensamble el cual estará contenido en la columna de features y contendrá todas las variables predictoras debemos definir los datos de distitno tipo como datos numéricos de tipo double en este caso, pues la regresión lineal trabaja con datos continuos y no categóricas como es la columna "Cruise_line". 

#Modificaciones en los datos:  

#Vemos que el tipo de "Cruise_line" y "Age" es distinto de las demás columnas 
cols_tipo_distinto = ["Cruise_line", "Age"]

for cl in cols_tipo_distinto:
    print(f" El tipo de {cl} es : {datos.schema[cl].dataType}")

#Cambiamos el tipo de ambas columnas para que sean del mismo tipo y evitar problemas en el proceso de modelado y entrenamiento del modelo de regresión:



las columnas correspondientes a las features serán ['Cruise_line', 'Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density']
 El tipo de Cruise_line es : StringType()
 El tipo de Age es : IntegerType()


In [0]:

#Conversión de "Cruise_line" a un elemento de tipo double: 

#Importamos las funciones necesarias
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

#Vamos a asignar a cada una de las lineas de crucero un valor determinado, para así poder tener un valor continuo en dicha columna, comenzamos en 1 pues he visto que si comenzamos en cero tendremos errores: 

#El método implementado se basa en indexar o añadir una columna con valores de tipo double correspondiente a la columna `Cruise_line`

#Para ello comenzamos definiendo el StringIndexer previamnete implementado de pyspark.ml.features:
indexer = StringIndexer(inputCol="Cruise_line", outputCol="Cruise_line_num_temp")

#Definimos los datos para nueva columna que vamos a indexar como una nueva columna de features:
datos_indexados = indexer.fit(datos).transform(datos)

# Modificamos el indexador para que añana nuestra nueva columna y empiece en 1 por eso le sumamos 1:
datos_indexados = datos_indexados.withColumn("Cruise_line_num", col("Cruise_line_num_temp") + 1)

# Eliminamos la columna temporal creada en indexer:
datos_indexados = datos_indexados.drop("Cruise_line_num_temp")

#Mostramos el resultado de la columan original y la que hemos indexado:
mapping = datos_indexados.select("Cruise_line", "Cruise_line_num").distinct().orderBy("Cruise_line_num")
mapping.show(truncate=False)

datos_indexados.printSchema()
datos_indexados.show()

#Asignando un valor double a cada una de las lineas de crucero, que es lo que queríamos.


#Conversión de la columna Age a tipo double en los datos indexados:
datos_indexados = datos_indexados.withColumn("Age", col("Age").cast("double"))


+-----------------+---------------+
|Cruise_line      |Cruise_line_num|
+-----------------+---------------+
|Royal_Caribbean  |1.0            |
|Carnival         |2.0            |
|Princess         |3.0            |
|Holland_American |4.0            |
|Norwegian        |5.0            |
|Costa            |6.0            |
|Celebrity        |7.0            |
|MSC              |8.0            |
|P&O              |9.0            |
|Star             |10.0           |
|Regent_Seven_Seas|11.0           |
|Silversea        |12.0           |
|Cunard           |13.0           |
|Oceania          |14.0           |
|Seabourn         |15.0           |
|Windstar         |16.0           |
|Azamara          |17.0           |
|Crystal          |18.0           |
|Disney           |19.0           |
|Orient           |20.0           |
+-----------------+---------------+

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonna

A continuación definiremos el conjuntos de las columnas para crear el **vector de ensamble** correspondiente al vector con las **features** o **características** 

In [0]:
#Para hacerlo definimos las columnas que vamos a excluir de los nuevos datos con la columna indexada:
col_exclusion = ["Ship_name", "Cruise_line","crew"]
#Hemos eliminado la columna Cruise_line porque ya tengo la nueva columna Cruise_line_num.

#Tomamos las columnas que no están contenidas en col_exclusion:
col_features = [col for col in datos_indexados.columns if col not in col_exclusion] 
print( f"las columnas correspondientes a las features serán", col_features)


las columnas correspondientes a las features serán ['Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density', 'Cruise_line_num']


Definimos el **Vector de Ensamble**.
 

In [0]:
# Importamos las funciones VectorAssembler y Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#Definimos un vector de ensamble el cual estará formado por los features que se emplean para estimar la variable `crew`: 
assembler = VectorAssembler(inputCols=col_features, outputCol='features')

En el siguiente paso, aplicaremos el `assembler` que hemos configurado. Esto generará una nueva columna llamada `features` en nuestro DataFrame, que contendrá un vector con todas las columnas numéricas seleccionadas como variables predictoras, que será el **Vector de Ensamble**. El cual emplearemos para poder entrenar nuestro modelo y realizar predicciones sobre el número de tripulantes posteriormente.

In [0]:
#Aplicamos el assembler y tranformamos los datos ya indexados:
var_predictoras = assembler.transform(datos_indexados)
var_predictoras.printSchema()
var_predictoras.show()
var_predictoras.columns

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Cruise_line_num: double (nullable = false)
 |-- features: vector (nullable = true)

+-----------+-----------+----+------------------+----------+------+------+-----------------+----+---------------+--------------------+
|  Ship_name|Cruise_line| Age|           Tonnage|passengers|length|cabins|passenger_density|crew|Cruise_line_num|            features|
+-----------+-----------+----+------------------+----------+------+------+-----------------+----+---------------+--------------------+
|    Journey|    Azamara| 6.0|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|           17.0|[6.0,30.276999999...|
|    

Por lo tanto una vez creado creada la columna de características debemos seleccionar las columnas que necesitasmos para entrenar nuestro modelo de aprendizaje automático.

In [0]:
datos_regresion = var_predictoras.select("features",'crew')

#### Entrenamiento y diseño del modelo de Regresión Lineal para predecir el número de tripulantes de cada barco

Dividimos los datos en dos conjuntos uno de entrenamiento y otro de prueba, en nuestro caso haremos una división 70-30, es decir un 70% de los datos destinados al entrenamiento y un 30% para prueba o test.

In [0]:
train_data,test_data = datos_regresion.randomSplit([0.7,0.3])
train_data.columns

Out[56]: ['features', 'crew']

Antes de definir nuestro modelo de regresión lineal vamos a realizar un análisis estadístico sobre la división de los datos en dos conjuntos de entrenamiento y prueba. Esto lo haremos para ver si hemos dividido bien el conjunto de los datos, comprender la escala de los datos, ver la distribución de los datos, así como detectar posibles outliers o valores atípicos.

In [0]:
train_data.describe().display()
test_data.describe().display()

summary,crew
count,115.0
mean,7.964347826086968
stddev,3.601256635070593
min,0.59
max,21.0


summary,crew
count,43.0
mean,7.339069767441859
stddev,3.2233359043325702
min,0.88
max,13.6


- Siguiendo las medidas estadísticas obtenidas del conjunto de entrenamiento y prueba, podemos decir que no hay aparentemente presencia de valores atípicos pues la media y la desviación típica es casi similar en ambos conjuntos. Así como el valor máximo y nínimo de ambos conjuntos. 
- Lo único en lo que difieren es el el número de registros lo que es lógico al definir nosotros una división 70/30 para entrenamiento y prueba.
- Esto nos permite afirmar que esta división hace que no se produzca un sesgo a causa de los datos, reduciendo la probabilidad de overfitting o sobreajuste del modelo.

Una vez definida la división de datos de entrenamiento y prueba, tras estudiar los valores estadísticos de ambos conjuntos y ver que son adecuados vamos a pasar a definir nuestro modelo de regresión lineal para estimar el número de tripulantes de un barco.

In [0]:
#Importamos la función LinearRegression:
from pyspark.ml.regression import LinearRegression

# Creamos el objeto Linear Regresion Model, donde especificamos "labelCol = 'crew'" que es la variable que queremos estimar:
lr = LinearRegression(labelCol='crew')

#Entrenamos el modelo con los datos de entrenamiento:
modelo_lr = lr.fit(train_data)

Una vez entrenado el modelo vamos a estudiar los parámetros principales del mismo. Es decir los coeficientes de la regresión lineal que nos permiten ver el peso de las variables predictoras y el valor del intercepto, que es el coeficiente independiente de la regresión.

In [0]:
# Parámetros principales del modelo de Regresión Lineal 
print("Coefficients: {} Intercept: {}".format(modelo_lr.coefficients,modelo_lr.intercept))

Coefficients: [-0.01505543388557455,0.014890944261644891,-0.149505433947062,0.3970326212911475,0.8152374984841201,-0.009084173217698445,0.05427921331536826] Intercept: -0.613744488715491


- Coeficientes de la regresión: [-0.0024294383986113973,0.015717996320700926,-0.14030439108705411,0.4659684979456127,
0.7711959395279109,-0.001968124548457334,0.048032667503154214] 
- Intercepto: -1.5349628951101988


Las columnas correspondientes a las features serán: 
- `Age`
- `Tonnage`
- `passengers`
- `length`
- `cabins` 
- `passenger_density` 
- `Cruise_line_num`

Podemos ver que los coeficientes de las variables que tienen más peso a la hora de estimar el número de tripulantes de un barco serán las features correspondientes a la columna cuatro y cinco, es decir, `length` y `cabins` son las variables independientes que más influyen en la predicción del numero de tripulantes.

Por lo tanto a la hora de predecir el numero de tripulantes de un nuevo barco la **longitud del barco** y el **número de camarotes** tendrán un peso mayor según nuestro modelo.


Una vez entrenado el modelo vamos a evaluarlo en los datos de prueba para ver la capacidad generalización del modelo para datos no observados así como la calidad del modelo definido sobre estos datos no visualizados previamente.

In [0]:
#Evaluamos el modelo de regresión con los datos de prueba definidos anteriormente: 
test_results = modelo_lr.evaluate(test_data)

Calculamos los residuos que se han generado durante el entrenamiento del modelo con los datos de prueba.

In [0]:
#Mostramos los residuos
test_results.residuals.display()

residuals
-0.9470726862349164
-0.9470726862349164
0.4801457750048801
0.3579126061975834
0.3152652782631122
-0.3819921098273067
-0.6654845140937624
0.8526515987119048
-0.2661207819250863
-0.6283702160808886


Los residuos de la regresión como hemos estudiado en asignaturas de estadística deben seguir una distribución normal o similar a la misma. Deben ser independientes unos de otros y estar generados de forma aleatoria.

Una vez entrenado el modelo y analizada la calidad de los residuos generados, vamos a realizar predicciones sobre el número de tripulantes con el modelo entrenado con los datos de entrenamiento y prueba.

#### Predicciones del modelo:

In [0]:
#Definimos una variable con los datos solo de las variables predictoras:
datos_pred = test_data.select('features')
#Predecimos aplicando el modelo definido anteriormente y los datos definimos en la linea anterior 
predicciones = modelo_lr.transform(datos_pred)
#Mostramos las predicciones:
predicciones.display()

features,prediction
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 30.276999999999997, 6.94, 5.94, 3.55, 42.64, 17.0))",4.497072686234916
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 30.276999999999997, 6.94, 5.94, 3.55, 42.64, 17.0))",4.497072686234916
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 93.0, 23.94, 9.65, 11.97, 38.85, 5.0))",10.60985422499512
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 110.23899999999999, 37.0, 9.51, 14.87, 29.79, 2.0))",11.142087393802417
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 113.0, 37.82, 9.51, 15.57, 29.88, 3.0))",11.684734721736888
"Map(vectorType -> dense, length -> 7, values -> List(6.0, 158.0, 43.7, 11.25, 18.0, 36.16, 1.0))",13.981992109827306
"Map(vectorType -> dense, length -> 7, values -> List(7.0, 116.0, 31.0, 9.51, 15.57, 37.42, 3.0))",12.665484514093762
"Map(vectorType -> dense, length -> 7, values -> List(8.0, 91.0, 22.44, 9.65, 11.22, 40.55, 5.0))",10.147348401288095
"Map(vectorType -> dense, length -> 7, values -> List(9.0, 59.058, 17.0, 7.63, 8.5, 34.74, 8.0))",7.666120781925087
"Map(vectorType -> dense, length -> 7, values -> List(9.0, 90.09, 25.01, 9.62, 10.94, 36.02, 1.0))",9.318370216080888


Evaluamos la calidad de las predicciones del modelo mediante diferentes métricas de evaluación. En este caso al tener un algoritmo de aprendizaje supervisado de regresión lineal las métricas que emplearemos serán:
- Error Cuadrático Medio (MSE) 
- Raíz cuadrada del error cuadrático medio (RMSE)
- Coeficiente de Correlación (R^2 )

In [0]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R²: {}".format(test_results.r2))

RMSE: 0.6327865706694425
MSE: 0.4004188440195934
R²: 0.9605431381566595


Estos resultados de las **métricas de evaluación** nos permiten decir que nuestro modelo tiene un buen rendimiento a la hora de estimar el número de tripulantes de un barco. 
- **MSE = 0.55735** no es muy alto, lo cual nos permite afirmar que las predicciones del modelo y los datos originales no difieren mucho. 
- **R^2= 0.936** nos permite afirmar que nuestro modelo de regresión lineal es capaz de explicar un 93.6% de la varianza de los datos, lo cual es muy bueno.

Vamos a calcular y evaluar las predicciones sobre el conjunto de entrenamiento para compararalas con las predicciones realizadas sobre los datos de prueba relizadas anteriormente:

In [0]:
#Evaluamos el modelo de regresión lineal con los datos de prueba:
train_results = modelo_lr.evaluate(train_data)

print("Entrenamiento - RMSE: {}".format(train_results.rootMeanSquaredError))
print("Entrenamiento - MSE: {}".format(train_results.meanSquaredError))
print("Entrenamiento - R²: {}".format(train_results.r2))

Entrenamiento - RMSE: 1.0379917107780714
Entrenamiento - MSE: 1.0774267916439872
Entrenamiento - R²: 0.9161944816168078


Vemos que con estos resultado de las métricas de evaluación de las predicciones sobre el conjunto de entrenamiento  podemos decir que nuestro modelo es bueno y es capaz de realizar  buenas predicciones pues:

- **RMSE = 1.03** con los datos de entrenamiento y **0.76**  con los datos de prueba lo cual nos indica que el modelo no está **sobreajustado**.

- **R²= 0.92** con los datos de entrenamiento y  **0.93**  con los datos de prueba lo cual muestra que el modelo  explica gran parte de la varianza sobre ambos conjuntos.

Aunque las predicciones sobre ambos conjuntos nos hacen intuir que el modelo definido predice muy bien el número de tripulantes, para ver que hemos dividio bien el conjunto de los datos y ver que las predicciones son fiables vamos a aplicar una **Validación Cruzada**.

Vamos a implementar una Validación cruzada empleando el método de K- Folds para ver el rendimiento de nuestro modelo y ver si se dividen bien los conjuntos de entrenamiento y prueba:
    

In [0]:
#Importamos las funciones necesarias:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

#Recordamos el modelo definido anteriormente:
lr = LinearRegression(featuresCol="features", labelCol="crew")

# Creamos una variable la cual llmaremos evaluador para poder evaluar la métrica RMSE:
evaluador = RegressionEvaluator(labelCol="crew", predictionCol="prediction", metricName="rmse")

# Realizamos una Validación Cruzada con el mínimo de K-Folds que se recomiendan, pues K debe estar entre 5 y 10 normalmente:
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=[{}],  # Necesario aunque esté vacío, pues busca hiperparámetros pero nosotros solo queremos hacer la validación cruzada
    evaluator=evaluador,
    numFolds=5,
    parallelism=2
)

# Entrenamos la validación cruzada con los datos de entrenamiento:
cv_model = cv.fit(train_data)

# Obtenemos el mejor modelo calculado:
mejor_modelo = cv_model.bestModel
print("Coeficientes:", mejor_modelo.coefficients)
print("Intercepto:", mejor_modelo.intercept)

# Evaluamos este modelo en el conjunto de prueba:
predictions = mejor_modelo.transform(test_data)
rmse = evaluador.evaluate(predictions)
print("El RMSE en el conjunto de prueba será:", rmse)


Coeficientes: [-0.01505543388557455,0.014890944261644891,-0.149505433947062,0.3970326212911475,0.8152374984841201,-0.009084173217698445,0.05427921331536826]
Intercepto: -0.613744488715491
El RMSE en el conjunto de prueba será: 0.6327865706694425


El resultado de la validación cruzada tiene un error cuadrático medio de 0.6327 el cual no es malo, nos permite ver que nuestro modelo predice bien y divide bien el conjunto de los datos en entrenamiento y prueba.

#### Conclusión
Nuestro modelo de regresión lineal diseñado para predecir el número de tripulantes es bastante bueno, es capaz de relizar buenas predicciones sobre el número de tripulantes de los barcos. Además el modelo que hemos definido no se sobreajusta a los datos ni se subajusta. Posee buenas métricas de rendimiento y buenos residuos.

# Ejercicio 4: Regresión Logística: Abandono de Clientes (Customer Churn) - Clasificación Binaria

Una agencia de marketing tiene numerosos clientes que utilizan sus servicios para producir anuncios en sus sitios web. Sin embargo, han notado que una parte considerable de sus clientes deja de contratar sus servicios con el tiempo (lo que se conoce como *churn* o abandono).

Actualmente, asignan gestores de cuentas de forma aleatoria, pero quieren que desarrolles un modelo de *machine learning* que prediga qué clientes tienen más probabilidades de abandonar el servicio. De esta manera, podrán asignar estratégicamente un gestor de cuentas a aquellos clientes que presenten mayor riesgo de abandono.

Afortunadamente, cuentan con datos históricos. ¿Puedes ayudarles? El objetivo es crear un algoritmo de clasificación que indique si un cliente va a abandonar o no.

El conjunto de datos se encuentra en el archivo [customer_churn.csv](https://github.com/ssanchezgoe/viu_big_data/blob/main/data/customer_churn.csv). A continuación, se describen los campos disponibles:

- **Name**: Nombre del último contacto en la empresa cliente
- **Age**: Edad del cliente
- **Total_Purchase**: Total de anuncios comprados
- **Account_Manager**: Variable binaria (0 = sin gestor, 1 = con gestor asignado)
- **Years**: Años totales como cliente
- **Num_sites**: Número de sitios web que utilizan el servicio
- **Onboard_date**: Fecha en la que se incorporó el último contacto
- **Location**: Dirección de la sede del cliente
- **Company**: Nombre de la empresa cliente

**Objetivos**

1. Crea un modelo de clasificación para predecir si un cliente abandonará el servicio.
2. Evalúa el rendimiento del modelo.
3. Utiliza el modelo para predecir el riesgo de abandono en nuevos clientes, cuyos datos se encuentran en [new_customers.csv](https://github.com/ssanchezgoe/viu_big_data/blob/main/data/new_customers.csv). Ten en cuenta que este conjunto no incluye la variable objetivo, por lo que tu predicción será la única información disponible para tomar decisiones.

Este análisis ayudará a la agencia a priorizar la atención personalizada hacia los clientes con mayor riesgo de abandono.


#### Importamos y analizamos los datos

In [0]:
#Cragamos los datos del archivo descrito en la formulación del problema:
datos_customers = spark.read.csv("/FileStore/tables/customer_churn-3.csv",inferSchema=True,header=True)

In [0]:
#Imprimimos los datos por pantalla:
datos_customers.display()

Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
Cameron Williams,42.0,11066.8,0,7.22,8.0,2013-08-30T07:00:40.000+0000,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1
Kevin Mueller,41.0,11916.22,0,6.5,11.0,2013-08-13T00:38:46.000+0000,"6157 Frank Gardens Suite 019 Carloshaven, RI 17756",Wilson PLC,1
Eric Lozano,38.0,12884.75,0,6.67,12.0,2016-06-29T06:20:07.000+0000,"1331 Keith Court Alyssahaven, DE 90114","Miller, Johnson and Wallace",1
Phillip White,42.0,8010.76,0,6.71,10.0,2014-04-22T12:43:12.000+0000,"13120 Daniel Mount Angelabury, WY 30645-4695",Smith Inc,1
Cynthia Norton,37.0,9191.58,0,5.56,9.0,2016-01-19T15:31:15.000+0000,"765 Tricia Row Karenshire, MH 71730",Love-Jones,1
Jessica Williams,48.0,10356.02,0,5.12,8.0,2009-03-03T23:13:37.000+0000,"6187 Olson Mountains East Vincentborough, PR 74359",Kelly-Warren,1
Eric Butler,44.0,11331.58,1,5.23,11.0,2016-12-05T03:35:43.000+0000,"4846 Savannah Road West Justin, IA 87713-3460",Reynolds-Sheppard,1
Zachary Walsh,32.0,9885.12,1,6.92,9.0,2006-03-09T14:50:20.000+0000,"25271 Roy Expressway Suite 147 Brownport, FM 59852-6150",Singh-Cole,1
Ashlee Carr,43.0,14062.6,1,5.46,11.0,2011-09-29T05:47:23.000+0000,"3725 Caroline Stravenue South Christineview, MA 82059",Lopez PLC,1
Jennifer Lynch,40.0,8066.94,1,7.11,11.0,2006-03-28T15:42:45.000+0000,"363 Sandra Lodge Suite 144 South Ann, WI 51655-7561",Reed-Martinez,1


In [0]:
#Vemos el tipo de cada columna de los datos y mostramos por pantalla las columnas de los datos:
datos_customers.printSchema()
datos_customers.columns 

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)

Out[77]: ['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

Tenemos diez columnas distintas que corresponden a diferentes features. 
- Podemos encontrar datos de tres tipos: 
- `string` -> Columnas: `Names`, `Location`, `Company`
- `double` -> Columnas: `Age`, `Years`, `Total_Purchase`,`Num_Sites`
- `integer`-> Columnas: `Account_manager`, `Churn` 

#### Preprocesamiento de los datos 

Una vez que hemos identificado las diferentes columnas y el tipo de cada una de ellas vamos a ver si tenemos datos faltantes, los cuales pueden provocar fallos cuando entrenemos nuestro modelo. En el preprocesamiento de los datos también debemos incluir la búsqueda de valores atípicos entre otras asignaciones, pero lo veremos cuando evaluemos el modelo.

In [0]:
#Vamos a ver si hay valores nulos en nuestros datos, para ello emplearemos el mismo código que empleamos en el ejercicio anterior:

#Importamos las funciones necesarias: 
from pyspark.sql.functions import col, sum as spark_sum

#Contamos los valores nulos por columnas:
null_counts = datos_customers.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in datos_customers.columns])

#Imprimimos los resultados por columnas:
null_counts.show()

+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Onboard_date|Location|Company|Churn|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|    0|  0|             0|              0|    0|        0|           0|       0|      0|    0|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+



Otra forma de calcularlo es la siguiente, la cual expusimos en clase:

In [0]:
from pyspark.sql.functions import col, sum, when

missing_data = datos_customers.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in datos_customers.columns
])
missing_data.display()

Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
0,0,0,0,0,0,0,0,0,0


En ambas implementaciones **obtenemos lo mismo**, una fila que indica los valores faltantes del DataFrame original que son como vemos cero. Lo cual nos indica que los datos están bien recopilados y que **no hay valores faltantes** en ninguna de las columnas.

### Selección de Caracetrísticas
En esta etapa vamos a seleccionar las características más relevantes para poder decir si un cliente abandonará los servicios prestados por la agencia de márketing.

Vamos a seleccionar cuales son las caracetrísticas más relevantes y vamos a eliminar aquellas que no influyen en la clasificación de un cliente a la hora de determinar el abanadono de los servicios de marketing. 

In [0]:
datos_customers.columns

Out[80]: ['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

#### Variables eliminadas
Vamos a eliminar las variables `Name`, `Location` y `Company`. Estas características **no aportan información relevante** para determinar si una persona abandonará los servicios de marketing (cacarterística `Churn`) pues no están correlacionadas con la contratación de este tipo de servicios.


Tomaremos como caracetrísticas relevantes:

- **Age**: Edad del cliente
- **Total_Purchase**: Total de anuncios comprados
- **Account_Manager**: Variable binaria (0 = sin gestor, 1 = con gestor asignado)
- **Years**: Años totales como cliente
- **Num_sites**: Número de sitios web que utilizan el servicio
- **Onboard_date**: Fecha en la que se incorporó el último contacto

In [0]:
#Definimos las columnas de nuestro modelo eliminando las características irrelevantes:
columnas_costumers = datos_customers.select(['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Churn'])

## Imputación de datos

A continuación, imputaremos los datos, simplemente eliminando aquellas columnas con datos faltantes.

In [0]:
# Número de filas antes de eliminar nulos
filas_antes = columnas_costumers.count()

# Eliminamos filas con datos faltantes (IMPUTACIÓN)
my_final_data = columnas_costumers.na.drop()

# Número de filas después
filas_despues = my_final_data.count()

# Cálculo del porcentaje de filas eliminadas
porcentaje_eliminadas = ((filas_antes - filas_despues) / filas_antes) * 100

# Mostrar el resultado
print(f"Filas antes: {filas_antes}")
print(f"Filas después: {filas_despues}")
print(f"Porcentaje de filas eliminadas: {porcentaje_eliminadas:.2f}%")

Filas antes: 900
Filas después: 900
Porcentaje de filas eliminadas: 0.00%


Hemos vuelto a verificar lo que ya habíamos visto anteriormente, no tenemos columnas con datos faltantes, pues el porcentaje de filas eliminadas es de 0.00%

#### Preparación de los datos para el entrenamiento:
Vamos a definir el tipo de cada característica para que se puedan empleare en el entrenamiento del modelo. Para posteriormente podamos definir el vector de características (**Vector de ensamble**) para emplearlo en el entrenamiento de nuestro modelo de regresión y poder realizar las predicciones de abandono de los servicios de márketing, correspondiente a la variable dependiente `Churn`.

In [0]:
#Recordemos el tipo de nuestras columnas:
columnas_costumers.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Churn: integer (nullable = true)



La única variable la cual no posee un buen tipo es `Onboard_date` la cual posee el tipo de `timestamp`. Para solventar este problema vamos a convertir esa catacterística **temporal** en una característica **numérica**.

Para la conversión de la caracetrística numérica `Onboard_date` vamos a seleccionar el año en el cual se incorporó el ultimo contacto:

In [0]:
#Importamos la función year para extraer el año de mis datos de tiempo timestamp:
from pyspark.sql.functions import year

# Creamos una nueva columna con solo el año:
datos_customers = datos_customers.withColumn("Onboard_year", year("Onboard_date"))

# Redefinimos las columnas eliminando la columna Onboard_date y sustituyéndola por la ya definida Onborad_year:
columnas_costumers = datos_customers.select([
    'Age',
    'Total_Purchase',
    'Account_Manager',
    'Years',
    'Num_Sites',
    'Onboard_year',  
    'Churn' #Variable que vamos a predecir 
])
columnas_costumers.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_year: integer (nullable = true)
 |-- Churn: integer (nullable = true)



A continuación procedemos a contruir el **vector de ensamble** con las características para entrenar el modelo.

In [0]:
#Definimos las funciones que vamos a emplear:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer)

assembler = VectorAssembler (inputCols = ['Age',
    'Total_Purchase',
    'Account_Manager',
    'Years',
    'Num_Sites',
    'Onboard_year'], outputCol= 'features')

Una vez hemos creado el **vector de ensamble**, dividimos el conjunto de datos en dos conjuntos uno de **entrenamiento** y otro de **prueba**.

#### Construcción del conjunto de entrenamiento y prueba

In [0]:
#Dividimos el conjunto de datos en entrenamiento y prueba. Vamos a dividirlo en un 70% de datos de entrenamiento y un 30% de prueba:
datos_entrenamiento, datos_prueba = columnas_costumers.randomSplit([0.7,.3])

#Aplicamos la variable assmbler ya definido para transformar los datos de entrenamiento y prueba.
datos_entrenamiento = assembler.transform(datos_entrenamiento)
datos_prueba = assembler.transform(datos_prueba)

A continuación definimos el modelo de **regresión logística** y entrenamos el modelo con el **conjunto de prueba**.

In [0]:
#Importamos la funcioón necesaria par definir el modelo:
from pyspark.ml.classification import LogisticRegression

#Definimos el modelo de regresión logística:
reg_log = LogisticRegression(featuresCol='features',labelCol='Churn')



In [0]:
#Entrenamos el modelo con los datos de entrenamiento:
modelo_entrenado = reg_log.fit(datos_entrenamiento)

#Rendimiento estadístico del modelo entrenado con los datos de entrenamiento:
#resumen_estadístico = modelo_entrenado.summary()

#A continuación aplicamos este modelo entrenado sobre los datos de prueba para ver las predicciones:
resultados = modelo_entrenado.transform(datos_prueba)

#Imprimimos los resultados 
resultados.display()

Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_year,Churn,features,rawPrediction,probability,prediction
25.0,9672.03,0,5.49,8.0,2006,0,"Map(vectorType -> dense, length -> 6, values -> List(25.0, 9672.03, 0.0, 5.49, 8.0, 2006.0))","Map(vectorType -> dense, length -> 2, values -> List(4.96811932988949, -4.96811932988949))","Map(vectorType -> dense, length -> 2, values -> List(0.9930918365991236, 0.00690816340087641))",0.0
27.0,8628.8,1,5.3,7.0,2016,0,"Map(vectorType -> dense, length -> 6, values -> List(27.0, 8628.8, 1.0, 5.3, 7.0, 2016.0))","Map(vectorType -> dense, length -> 2, values -> List(5.883865998051604, -5.883865998051604))","Map(vectorType -> dense, length -> 2, values -> List(0.9972237276473231, 0.0027762723526768562))",0.0
28.0,11204.23,0,3.67,11.0,2012,0,"Map(vectorType -> dense, length -> 6, values -> List(28.0, 11204.23, 0.0, 3.67, 11.0, 2012.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5301067785581637, -1.5301067785581637))","Map(vectorType -> dense, length -> 2, values -> List(0.8220219366538991, 0.1779780633461009))",0.0
29.0,9617.59,0,5.49,8.0,2013,0,"Map(vectorType -> dense, length -> 6, values -> List(29.0, 9617.59, 0.0, 5.49, 8.0, 2013.0))","Map(vectorType -> dense, length -> 2, values -> List(4.705753095364079, -4.705753095364079))","Map(vectorType -> dense, length -> 2, values -> List(0.9910379434503924, 0.008962056549607578))",0.0
29.0,10203.18,1,5.82,8.0,2014,0,"Map(vectorType -> dense, length -> 6, values -> List(29.0, 10203.18, 1.0, 5.82, 8.0, 2014.0))","Map(vectorType -> dense, length -> 2, values -> List(4.007512274883712, -4.007512274883712))","Map(vectorType -> dense, length -> 2, values -> List(0.9821459977934807, 0.017854002206519337))",0.0
29.0,12711.15,0,5.74,7.0,2011,0,"Map(vectorType -> dense, length -> 6, values -> List(29.0, 12711.15, 0.0, 5.74, 7.0, 2011.0))","Map(vectorType -> dense, length -> 2, values -> List(5.748678128553802, -5.748678128553802))","Map(vectorType -> dense, length -> 2, values -> List(0.9968231338377529, 0.003176866162247105))",0.0
29.0,13255.05,1,4.89,8.0,2016,0,"Map(vectorType -> dense, length -> 6, values -> List(29.0, 13255.05, 1.0, 4.89, 8.0, 2016.0))","Map(vectorType -> dense, length -> 2, values -> List(4.310710692906017, -4.310710692906017))","Map(vectorType -> dense, length -> 2, values -> List(0.9867538110603359, 0.013246188939664116))",0.0
30.0,6744.87,0,5.14,9.0,2014,0,"Map(vectorType -> dense, length -> 6, values -> List(30.0, 6744.87, 0.0, 5.14, 9.0, 2014.0))","Map(vectorType -> dense, length -> 2, values -> List(3.630839946121796, -3.630839946121796))","Map(vectorType -> dense, length -> 2, values -> List(0.9741898894514716, 0.025810110548528442))",0.0
30.0,11575.37,1,5.22,8.0,2016,1,"Map(vectorType -> dense, length -> 6, values -> List(30.0, 11575.37, 1.0, 5.22, 8.0, 2016.0))","Map(vectorType -> dense, length -> 2, values -> List(4.171552740074116, -4.171552740074116))","Map(vectorType -> dense, length -> 2, values -> List(0.9848061299647559, 0.015193870035244084))",0.0
31.0,8829.83,1,4.52,8.0,2006,0,"Map(vectorType -> dense, length -> 6, values -> List(31.0, 8829.83, 1.0, 4.52, 8.0, 2006.0))","Map(vectorType -> dense, length -> 2, values -> List(4.620726012516961, -4.620726012516961))","Map(vectorType -> dense, length -> 2, values -> List(0.990250346229865, 0.009749653770135036))",0.0


#### Evaluación del modelo con los datos de entrenamiento
En este apartado vamos a comparar las predicciones realizadas con los datos de entrenamiento y los datos originales:

In [0]:
#Seleccionamos y mostramos la columna de abandonos original (Churn) y la columna de las predicciones (prediction):
resultados.select('Churn','prediction').display()

Churn,prediction
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
1,0.0
0,0.0


Una vez que hemos visto como clasifica los datos de prueba nuestro modelo entrenado sobre los datos de entrenamiento vamos a importar el **`BinaryClassificationEvaluator`** para evaluar el rendimiento del modelo de clasificación binaria. 

In [0]:
#Importamos el clasificador binario:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Definimos un evaluador para calcular el área bajo la curva ROC (por defecto) aplicando el BinaryClassificationEvaluator: 
evaluador = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='Churn',
    metricName='areaUnderROC'  # Empleamos como métrica de rendimiento el área bajo la curva ROC
)

# Calculamos la métrica a partir del evaluador y de las predicciones realizadas con el modelo entrenado con los datos de entrenamiento:
roc_auc = evaluador.evaluate(resultados)
print("Área bajo la curva ROC (entrenamiento):", roc_auc)

Área bajo la curva ROC (entrenamiento): 0.8153162055335968


El área bajo la curva ROC tiene un valo de 0.844 el cual nos indica que el modelo es bueno para los datos de entrenamiento, pues este será capaz de clasificar bien los datos. Es decir, no clasifica el abandono de los servicios de marketing de forma aleatoria.

In [0]:
AUC = evaluador.evaluate(resultados)
print(f"AUC del modelo: {AUC:.3f}")

AUC del modelo: 0.815


3. Utiliza el modelo para predecir el riesgo de abandono en nuevos clientes, cuyos datos se encuentran en [new_customers.csv](https://github.com/ssanchezgoe/viu_big_data/blob/main/data/new_customers.csv). Ten en cuenta que este conjunto no incluye la variable objetivo, por lo que tu predicción será la única información disponible para tomar decisiones.


Para comenzar con la clasificación de los nuevos clientes antes de aplicar el modelo de regresión logística definido previamente vamos a cargar y mostrar los datos por pantalla

In [0]:
#Importamos los datos con los nuevos clientes 
new_customers = spark.read.csv("/FileStore/tables/new_customers.csv", header=True, inferSchema=True)
#Imprimimos los datos por pantalla:
new_customers.display()

Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company
Andrew Mccall,37.0,9935.53,1,7.71,8.0,2011-08-29T18:37:54.000+0000,"38612 Johnny Stravenue Nataliebury, WI 15717-8316",King Ltd
Michele Wright,23.0,7526.94,1,9.28,15.0,2013-07-22T18:19:54.000+0000,"21083 Nicole Junction Suite 332, Youngport, ME 23686-4381",Cannon-Benson
Jeremy Chang,65.0,100.0,1,1.0,15.0,2006-12-11T07:48:13.000+0000,"085 Austin Views Lake Julialand, WY 63726-4298",Barron-Robertson
Megan Ferguson,32.0,6487.5,0,9.4,14.0,2016-10-28T05:32:13.000+0000,"922 Wright Branch North Cynthialand, NC 64721",Sexton-Golden
Taylor Young,32.0,13147.71,1,10.0,8.0,2012-03-20T00:36:46.000+0000,Unit 0789 Box 0734 DPO AP 39702,Wood LLC
Jessica Drake,22.0,8445.26,1,3.46,14.0,2011-02-04T19:29:27.000+0000,1148 Tina Stravenue Apt. 978 South Carlos TX 21222 9221,Parks-Robbins


In [0]:
#Mostramos las columnas de los datos 
new_customers.columns

Out[93]: ['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company']

Volvemos a crear el **vector de ensamble** para todas las caracetrísticas que contiene el documento `new_costumers`.

In [0]:


#Definimos las características que hemos considerado relevantes para predecir el abandono de los servicios de marketing en los clientes. Recordemos que "Name", "Location" y "Company" son las tres características que no son significativas o relevantes.

#Al igual que hicimos anteriormente eliminamos las columnas que no aportan información relevante así como cambiar el tipo de la columna "Onboard_date":

#Importamos la función year para extraer el año de mis datos de tiempo timestamp:
from pyspark.sql.functions import year

# Agregamos una nueva columna con solo el año de Onboard_Date:
datos_new_customers = new_customers.withColumn("Onboard_year", year("Onboard_date"))

# Redefinimos las columnas eliminando la columna Onboard_date y sustituyéndola por la ya definida Onborad_year:
columnas_costumers = datos_new_customers.select([
    'Age',
    'Total_Purchase',
    'Account_Manager',
    'Years',
    'Num_Sites',
    'Onboard_year'
])
columnas_costumers.printSchema()

#Importamos las funciones para crear el vector de ensamble con las columnas redefinidas en "columnas_customers":

from pyspark.ml.feature import (VectorAssembler,VectorIndexer)

assembler_new = VectorAssembler (inputCols = ['Age',
    'Total_Purchase',
    'Account_Manager',
    'Years',
    'Num_Sites',
    'Onboard_year'], outputCol= 'features')


root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_year: integer (nullable = true)



Una vez definido este **vector de ensamble** vamos a emplearlo junto con el modelo ya entrenado para transformar nuestros datos y realizar una predicción sobre la clasificación de los nuevos clientes:

In [0]:
#Transformamos nuestros datos originales con la columna "Onboard_year" con la función definida "assembler_new":
new_customers_transf = assembler_new.transform(datos_new_customers)

# Empleamos el modelo previamente entrenado para relizar las predicciones:
pred_nuevos_clientes = modelo_entrenado.transform(new_customers_transf)

# Tomamos las columnas necesarias para analizar los resultados que serán las columnas que contengan las predicciones y la probabilidad. Mostramos las predicciones para los diez primeros clientes 
pred_nuevos_clientes.select("prediction", "probability").show(10)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.92245538870791...|
|       1.0|[0.00112414669827...|
|       1.0|[0.00724666504198...|
|       1.0|[0.00387455214533...|
|       0.0|[0.80671441758863...|
|       1.0|[0.08568566074481...|
+----------+--------------------+



Por último vamos a evaluar la calidad de las predicciones calculando algunas de las métricas de rendimiento más relevantes

In [0]:
#Importamos de nuevo el clasificador binario y multiclase:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Calculamos el Accuracy, que corresponde al cociente entre el número de predicciones bien hechas sobre el total:
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Churn", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(resultados)
print(f"Accuracy: {accuracy:.4f}")

#Calulamos el F1-Score que es una métrica muy útil porque sirve para datos desbalanceados a diferencia del accuracy:
f1_evaluator = MulticlassClassificationEvaluator(labelCol="Churn", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(resultados)
print(f"F1 Score: {f1:.4f}")

Accuracy: 0.8905
F1 Score: 0.8924


Vemos que las predicciones son muy buenas pues el **accuracy** y el **F1-Score** poseen valores muy cercanos a 1. Por lo tanto podemos concluir que nuestro modelo de regresión logística es capaz de realizar uan buena clasificación de los clientes y determinar con cierta seguridad si un cliente abandonará o no los servicios de marketing.