**Descripción corta del challenge**

A partir de una BD de clientes de una empresa de telefonía, se realizan una serie de consultas con PySpark-SQL.

In [0]:
# Se realizan todos los imports necesarios
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# Cargamos las tablas de la BD
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# Mostramos las tablas
df_clientes.show(5)
df_ofertas.show(5)
df_facturas_mes_ant.show(5)
df_facturas_mes_actual.show(5)
df_consumos_diarios.show(5)

+----------+---------------+----+----+-----------+--------+
|id_cliente|         nombre|edad|sexo|  provincia|    pais|
+----------+---------------+----+----+-----------+--------+
|         4|    Pedro Conde|  32|   M|     Madrid|  España|
|         5|     Ana Robles|  41|   F| Valladolid|  España|
|         6|   David Roldan|  74|   M|Guadalajara|  Mexico|
|         1|    Pablo Perez|  26|   M|     Madrid|  España|
|         2|Eduardo Redondo|  58|   M|     Bogota|Colombia|
+----------+---------------+----+----+-----------+--------+
only showing top 5 rows

+---------+--------------------+-------+
|id_oferta|         descripcion|importe|
+---------+--------------------+-------+
|        6|Fibra Optica 600M...|  124.5|
|        3|Fibra Optica 300M...|   85.0|
|        1|Fibra Optica 300M...|   70.0|
|        2|Fibra Optica 600M...|   78.5|
|        4|Fibra Optica 600M...|   95.0|
+---------+--------------------+-------+
only showing top 5 rows

+----------+---------+-------+----------+

**1. Mostrar por pantalla el número TOTAL de clientes que tenían en el mes anterior más de un contrato con la compañía**

In [0]:
# Hacemos una agrupación del dataframe por id_cliente, para luego contar cuántas registros tenemos de cada id_cliente
# (lo cual equivale a contar los contratos que tiene cada cliente). Luego, filtramos aquellos clientes que tengan más
# de un contrato, y contamos los registros que nos quedan

clientes_vip_mes_anterior = df_facturas_mes_ant.groupBy("id_cliente")\
                                                .agg(count("id_cliente"))\
                                                .filter(col("count(id_cliente)") > 1)\
                                                .count()

print("El número de clientes con más de un contrato el mes anterior es", clientes_vip_mes_anterior)

El número de clientes con más de un contrato el mes anterior es 8


**2. Generar un nuevo dataframe de facturas del mes actual que asigne un 7% de descuento a todos los contratos de clientes que ya existían en el mes anterior (los contratos de los clientes nuevos seguirán con el mismo importe). Mantener la columna "importe" y crear una nueva columna "importe_dto" con el nuevo importe, casteada a 2 decimales. Mostrar además el resultado ordenado por los campos id_cliente ascendente, importe descendente. **

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En el df_facturas_mes_ant marcamos con un flag a todos los clientes existentes, y luego hacemos left join con el df_facturas_mes_actual
# Así obtenemos un df con los clientes del mes actual, marcados con un flag si existían el mes anterior.
# Eliminamos los duplicados que nos genera el join debido a la multiplicidad de facturas, para un mismo cliente.

df_facturas_mes_actual_flg = df_facturas_mes_actual.join(df_facturas_mes_ant.select("id_cliente").withColumn("flg_mes_ant", lit(1)),["id_cliente"],"left")\
                                                                             .dropDuplicates(["id_cliente","id_oferta","importe"]).orderBy("id_cliente")

# Generamos una nueva columna que poblamos de valores, aplicando descuento si el cliente estaba el mes anterior.
# Ordenamos de la forma que se nos ha pedido

df_facturas_mes_actual_flg.withColumn("importe_dto", when(col("flg_mes_ant") == 1, (col("importe")*0.93).cast(DecimalType(scale=2))).otherwise(col("importe").cast(DecimalType(scale=2)))) \
                            .orderBy(asc("id_cliente"), desc("importe")).drop("flg_mes_ant").show()

+----------+---------+-------+----------+-----------+
|id_cliente|id_oferta|importe|     fecha|importe_dto|
+----------+---------+-------+----------+-----------+
|         1|        6|  124.5|2020-08-31|     115.79|
|         1|        6|  118.6|2020-08-31|     110.30|
|         2|        4|   95.0|2020-08-31|      88.35|
|         3|        5|  102.5|2020-08-31|      95.33|
|         4|        1|   70.0|2020-08-31|      70.00|
|         5|        7|   80.5|2020-08-31|      74.87|
|         5|        9|   40.5|2020-08-31|      37.67|
|         6|        8|   84.0|2020-08-31|      78.12|
|         7|        4|   95.0|2020-08-31|      88.35|
|         8|        5|  100.5|2020-08-31|      93.47|
|         9|        7|   78.5|2020-08-31|      78.50|
|         9|       10|  29.99|2020-08-31|      29.99|
|        10|        6|  124.5|2020-08-31|     115.79|
|        11|        3|   83.0|2020-08-31|      77.19|
|        12|        1|   70.0|2020-08-31|      65.10|
|        13|        4|   95.

**3. Por problemas de saturación en la red debido al incremento en el uso de datos por el confinamiento debido a la COVID-19, se decide limitar el uso de datos este mes subiendo la tarifa de todas las ofertas que incluyan "datos ilimitados" en un 15%. Obtener un DF con las mismas columnas que el DF de facturas del mes actual y a mayores la columna "importe_dto"**.

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En primer lugar, en el df_ofertas buscamos con "contains" aquellas ofertas que tienen datos ilimitados. Una vez hecho esto procedemos de forma similar a lo realizado en el punto 2, 
# marcando con un flag las ofertas de datos ilimitados.

df_facturas_mes_actual_flg = df_facturas_mes_actual.join(df_ofertas.filter(col("descripcion").contains("datos ilimitados")).withColumn("datos_ilimitados", lit(1)) \
                                                         .select("id_oferta","datos_ilimitados"), ["id_oferta"], "left")

# Similar al punto 2 también, generamos una nueva columna basándonos en si el cliente tiene o no datos ilimitados.

df_facturas_mes_actual_flg.withColumn("importe_dto", when(col("datos_ilimitados") == 1, (col("importe")*1.15).cast(DecimalType(scale=2))).otherwise(col("importe").cast(DecimalType(scale=2)))) \
                            .drop("datos_ilimitados").orderBy("id_cliente").show()

+---------+----------+-------+----------+-----------+
|id_oferta|id_cliente|importe|     fecha|importe_dto|
+---------+----------+-------+----------+-----------+
|        6|         1|  124.5|2020-08-31|     143.17|
|        6|         1|  118.6|2020-08-31|     136.39|
|        4|         2|   95.0|2020-08-31|     109.25|
|        5|         3|  102.5|2020-08-31|     117.87|
|        1|         4|   70.0|2020-08-31|      70.00|
|        7|         5|   80.5|2020-08-31|      80.50|
|        9|         5|   40.5|2020-08-31|      46.57|
|        8|         6|   84.0|2020-08-31|      96.60|
|        4|         7|   95.0|2020-08-31|     109.25|
|        5|         8|  100.5|2020-08-31|     115.57|
|       10|         9|  29.99|2020-08-31|      29.99|
|        7|         9|   78.5|2020-08-31|      78.50|
|        6|        10|  124.5|2020-08-31|     143.17|
|        3|        11|   83.0|2020-08-31|      95.45|
|        1|        12|   70.0|2020-08-31|      70.00|
|        4|        13|   95.

**4. Crear una nueva variable "grupo_edad" que agrupe a los clientes, tanto del mes_actual como del mes_anterior en 4 rangos según su edad asignando valores del 1 - 4 segun el rango al que pertenezcan (18-25 (1), 26-40 (2), 41-65 (3), >65 (4)). Obtener una tabla resumen que extraiga para cada para cada uno de los 4 grupos identificados la MEDIA de consumo de datos, sms enviados, minutos_movil, minutos_fijo, con todos los campos casteados a 2 decimales y ordenar el DF por grupo_edad ascendente. Extraer conclusiones.**

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En primer lugar, en el dataframe de consumos calculamos, para cada cliente, el total de datos, sms, movil y fijo consumidos. 
# Los necesitaremos posteriormente para calcular los consumos medios por grupo

df_resumen = df_consumos_diarios.groupBy("id_cliente").agg(sum("consumo_datos_MB"), sum("sms_enviados"), sum("minutos_llamadas_movil"), sum("minutos_llamadas_fijo")).orderBy("id_cliente")

# Sobre el dataframe de clientes averiguamos los grupo de edad a los que pertenece cada cliente,
# y se los añadimos al df_resumen que hemos creado previamente

df_join = df_clientes.withColumn("grupo_edad", when(col("edad") < 26, lit(1))
                                         .when(col("edad").between(26,40),lit(2))
                                         .when(col("edad").between(41,65),lit(3))
                                         .otherwise(lit(4)))

df_resumen = df_resumen.join(df_join.select("id_cliente","grupo_edad"),["id_cliente"],"left").orderBy("id_cliente")

# Agrupamos por grupo_edad y para cada grupo calculamos las medias de consumo, casteadas a 2 decimales.

df_resumen = df_resumen.groupBy("grupo_edad").agg(mean("sum(consumo_datos_MB)").cast(DecimalType(scale=2)).alias("Media_consumo_datos_MB"), 
                                                  mean("sum(sms_enviados)").cast(DecimalType(scale=2)).alias("Media_sms_enviados"), 
                                                  mean("sum(minutos_llamadas_movil)").cast(DecimalType(scale=2)).alias("Media_minutos_llamadas_movil"), 
                                                  mean("sum(minutos_llamadas_fijo)").cast(DecimalType(scale=2)).alias("Media_minutos_llamadas_fijo")) \
                                                .orderBy("grupo_edad")

df_resumen.show()

# Conclusiones: En la tecnología más reciente para el uso en teléfonos (datos), hay más consumo cuanto menor es la edad.
# En las tecnologías que quizá hayan quedado más obsoletas (sms y fijo), hay más consumo cuanto mayor es la edad.
# En cuanto a las llamadas a móvil los grupos intemedios son los que presentan los valores máximos, quizá por tener mayor poder adquisitivo,
# quizá porque los más jóvenes prefieran sustituir la llamada a móvil por datos y los más mayores, por una llamada a fijo... sería necesario indagar más.

+----------+----------------------+------------------+----------------------------+---------------------------+
|grupo_edad|Media_consumo_datos_MB|Media_sms_enviados|Media_minutos_llamadas_movil|Media_minutos_llamadas_fijo|
+----------+----------------------+------------------+----------------------------+---------------------------+
|         1|              41485.40|             16.60|                      588.00|                     336.80|
|         2|              33569.50|             35.00|                     1081.13|                     674.50|
|         3|              14672.11|            162.89|                     1005.56|                     819.44|
|         4|               5307.00|            191.00|                      560.80|                    1108.40|
+----------+----------------------+------------------+----------------------------+---------------------------+



**5. Se quiere realizar un estudio por Sexo para analizar si son las mujeres o los hombres quiénes consumen más datos durante el fin de semana y hacen más llamadas desde el móvil. Para ello se deberá, sin ayuda de un calendario, extraer el día de la semana al que corresponde cada una de las fechas del mes de
Agosto para saber cuáles son fin de semana (se consideran días de fin de semana el viernes, sábado y domingo). </br> El DF a obtener deberá tener 2 registros con las columnas: sexo, total_mins_movil_finde, total_datos_moviles_finde. </br> Extraer conclusiones tras presentar el DF resultante.**

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
df_resumen = df_consumos_diarios.drop("sms_enviados", "minutos_llamadas_fijo")
# Creamos una nueva columna donde, con la función dayofweek, se asigna qué día es de la semana (el domingo es el día 1)
# Posteriormente, filtramos los resultados del dataframe para obtener sólo los registros de consumos en fin de semana
df_resumen = df_resumen.withColumn("diasemana", dayofweek(col("fecha"))).filter((col("diasemana") == 1) | (col("diasemana") == 6) | (col("diasemana") == 7))

# A ese dataframe resultante, le añadimos una columna que nos indique el sexo de cada cliente
df_resumen = df_resumen.join(df_clientes.select("id_cliente", "sexo"), ["id_cliente"], "left")

# Agrupamos por sexo y hallamos los consumos medios. Cada registro es un día, como queremos presentar los resultados con los valores por fin de semana
# y no por día, multiplicamos la media obtenida por 3
df_resumen = df_resumen.groupBy("sexo").agg((mean("minutos_llamadas_movil")*3).cast(DecimalType(scale=2)).alias("total_mins_movil_finde"), 
                               (mean("consumo_datos_MB")*3).cast(DecimalType(scale=2)).alias("total_datos_moviles_finde"))
df_resumen.show()

# Conclusiones:
# Las mujeres, durante los fines de semana, pasan más tiempo hablando por teléfono y consumen más datos.
# Para profundizar en el análisis, sería interesante obtener este mismo dataframe pero sin filtrar por los días de la semana, y comparar resultados con el dataframe obtenido aquí

+----+----------------------+-------------------------+
|sexo|total_mins_movil_finde|total_datos_moviles_finde|
+----+----------------------+-------------------------+
|   F|                119.04|                  2439.71|
|   M|                 52.94|                  2131.64|
+----+----------------------+-------------------------+



**6. Obtener un DF que contenga 4 registros, que serán el cliente de cada grupo edad que más datos móviles ha consumido durante los 15 primeros días del mes de Agosto (día 15 incluido en el cálculo). El DF deberá contener las columnas: nombre, edad, grupo_edad, datos_moviles_total_15, max_sms_enviados_15** </br> (max_sms_enviados_15 contiene el máximo de sms enviados en un día por el cliente con más datos_moviles consumidos de cada grupo_edad durante esos 15 primeros días del mes). </br>
**Extraer conclusiones en cuanto a datos consumidos y sms enviados por cada grupo_edad.**

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En primer lugar, en el df de consumos nos quedamos sólo con los registros correspondientes a los primeros 15 días del mes
df_resumen = df_consumos_diarios.filter(dayofmonth(col("fecha")) <= 15)

# En el df de clientes asignamos los grupos de edad, y con un join añadimos esta info, junto con nombre y edad que también la vamos a necesitar,
# al df de consumos
df_join = df_clientes.withColumn("grupo_edad", when(col("edad") < 26, lit(1))
                                         .when(col("edad").between(26,40),lit(2))
                                         .when(col("edad").between(41,65),lit(3))
                                         .otherwise(lit(4)))
df_resumen = df_resumen.join(df_join.select("id_cliente", "nombre", "edad", "grupo_edad"),["id_cliente"],"left")




# En primera instancia, con un groupby calculamos el total de datos consumidos para cada cliente en el periodo de 15 días, y cuál es el número máximo de sms enviados en un día

df_resumen = df_resumen.groupBy("grupo_edad", "nombre").agg(sum("consumo_datos_MB").alias("datos_moviles_total_15"), 
                                               max("sms_enviados").alias("max_sms_enviados_15"), 
                                               first("edad").alias("edad"))

df_resumen.show()

+----------+--------------------+----------------------+-------------------+----+
|grupo_edad|              nombre|datos_moviles_total_15|max_sms_enviados_15|edad|
+----------+--------------------+----------------------+-------------------+----+
|         4|       Marta Rodrigo|                  4657|                 16|  67|
|         2|        Romina Verde|                 23603|                  1|  29|
|         3|        Walter Ramos|                 11892|                 18|  54|
|         3|          Ana Robles|                  4256|                  8|  41|
|         1|       Carmen Arauzo|                 15402|                  3|  19|
|         4|        David Roldan|                  1506|                 22|  74|
|         2|         Pedro Conde|                 10163|                  4|  32|
|         2|         Pablo Perez|                 14328|                  3|  26|
|         3|     Eduardo Redondo|                  5078|                 12|  58|
|         4|    

In [0]:
# Una vez tenemos esta información, en una nueva columna y ayudándonos de las operaciones window, hacemos un ranking por uso de datos
# dentro de cada grupo.
# Nos quedamos sólo con el cliente de cada grupo que más ha consumido y mostramos resultados.

window_grupo_consumo =  Window.partitionBy("grupo_edad").orderBy(asc("grupo_edad"), desc("datos_moviles_total_15"))

df_resumen = df_resumen.withColumn("ranking_consumo", row_number().over(window_grupo_consumo)) \
                       .filter(col("ranking_consumo") == 1) \
                       .select("nombre", "edad", "grupo_edad", "datos_moviles_total_15", "max_sms_enviados_15")

df_resumen.show()

+-------------+----+----------+----------------------+-------------------+
|       nombre|edad|grupo_edad|datos_moviles_total_15|max_sms_enviados_15|
+-------------+----+----------+----------------------+-------------------+
| Clara Suarez|  21|         1|                 27665|                  2|
| Romina Verde|  29|         2|                 23603|                  1|
| Walter Ramos|  54|         3|                 11892|                 18|
|Marta Rodrigo|  67|         4|                  4657|                 16|
+-------------+----+----------+----------------------+-------------------+



**7. Nos interesa averiguar los minutos de llamadas de los clientes que son nuevos este mes para realizar un estudio del impacto que tendrán en caso de producirse un pico de volumen de llamadas en la red. </br> Obtener un DF que contenga sólo a los clientes nuevos de este mes con 4 columnas: nombre_cliente_nuevo, edad, importe_total_mes_actual, total_minutos. </br> Se ordenará el DF resultante por la columna "total_minutos" que contiene el total de minutos de llamadas fijas y móviles durante este mes para cada cliente, ordenándolos de manera que el primer cliente nuevo que aparezca sea el que menos minutos de llamadas ha realizado en el mes de Agosto.</br>** NOTAS: Si hay solo 2 clientes nuevos, el DF a obtener tendrá sólo 2 registros. </br> Tener presente que a la hora de hacer un join, la clave de unión debe ser única en la tabla de la derecha o de otra manera se multiplicarán los registros resultantes de manera descontrolada.
</br> Tener en cuenta que si un cliente nuevo tiene 2 ofertas contratadas, la columna importe_mes_actual deberá contener la suma de los importes de ambas ofertas.

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En esta sentencia, en primera instancia hacemos un left anti join para quedarnos sólo con los clientes nuevos, y en adelante ya sólo haremos los cálculos con ellos
# Además, al usar el df de facturas, calculamos ya el importe total pagado el mes actual para cada cliente nuevo
df_importe_total_mes = df_facturas_mes_actual.join(df_facturas_mes_ant, "id_cliente", "leftanti") \
                                                .groupBy("id_cliente").agg(sum("importe").alias("importe_total_mes"))

df_importe_total_mes.show()

+----------+-----------------+
|id_cliente|importe_total_mes|
+----------+-----------------+
|        28|            102.5|
|         4|             70.0|
|         9|           108.49|
|        19|             85.0|
|        21|            124.5|
+----------+-----------------+



In [0]:
# A la información recogida en la sentencia anterior le unimos el total de minutos hablados, recabados de la tabla de consumos.
# Para cada cliente nuevo se obtiene en primer lugar los minutos hablados en todo el mes, tanto para fijo como para móvil, y luego se suman ambos valores
df_total_minutos = df_consumos_diarios.join(df_importe_total_mes, "id_cliente", "inner") \
                                        .groupBy("id_cliente").agg(sum("minutos_llamadas_movil"),sum("minutos_llamadas_fijo"), first("importe_total_mes").alias("importe_total_mes")) \
                                        .withColumn("total_minutos", col("sum(minutos_llamadas_movil)") + col("sum(minutos_llamadas_fijo)")) \
                                        .drop("sum(minutos_llamadas_movil)", "sum(minutos_llamadas_fijo)")

# Por último añadimos información personal de estos clientes nuevos: nombre y edad
df_resumen = df_clientes.select("id_cliente", "nombre", "edad").join(df_total_minutos, "id_cliente", "inner") \
                            .drop("id_cliente").orderBy("total_minutos")

df_resumen.show()

+--------------------+----+-----------------+-------------+
|              nombre|edad|importe_total_mes|total_minutos|
+--------------------+----+-----------------+-------------+
|       David Cardoso|  24|           108.49|          368|
|        Walter Ramos|  54|            102.5|          528|
|Juan Carlos Iglesias|  38|             85.0|          923|
|         Pedro Conde|  32|             70.0|         1939|
|       Fatima Cuevas|  29|            124.5|         2366|
+--------------------+----+-----------------+-------------+



**8. Obtener un DF que contenga, para los clientes que ya existían en el mes anterior y siguen dados de alta este mes, 3 columnas: nombre, edad, n_dias_sin_sms. La última columna se refiere a obtener el número de veces que cada cliente NO ha enviado ningún SMS en el mes de Agosto. Forzar a que el comando "show" muestre 30 valores.** </br> (Si en todo el mes el cliente "A" no envío SMS 3 días en todo el mes, esta columna deberá contener el valor 3. Si hubiera algún cliente que hubiese enviado al menos 1 sms todos los dias del mes, tendrá valor 0 en esta columna).

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# Aprovechamos que el df de consumos diarios contiene información tanto de los clientes existentes este mes como de los sms enviados, realizaremos dos transformaciones en la misma sentencia
# En primer lugar hacemos un inner join con una columna que contenga todos los clientes del mes pasado una única vez, para evitar multiplicidad de registros.
# A continuación, recorremos la columna sms_enviados para crear otra que nos diga si ese día el cliente envió o no sms. Finalmente, para cada cliente sumamos días sin sms.
df_resumen = df_consumos_diarios.join(df_facturas_mes_ant.select("id_cliente").distinct(), "id_cliente", "inner") \
                                .withColumn("no_hay_sms", when(col("sms_enviados") == 0, lit(1)).otherwise(lit(0))) \
                                .groupby("id_cliente").agg(sum("no_hay_sms").alias("dias_sin_sms")).orderBy("id_cliente")

# Usamos el id cliente para recuperar de la tabla de clientes el resto de información que se nos pide

df_resumen = df_clientes.select("id_cliente", "nombre", "edad", "id_cliente").join(df_resumen, "id_cliente", "inner").orderBy("nombre").drop("id_cliente")

df_resumen.show(30)

+-----------------+----+------------+
|           nombre|edad|dias_sin_sms|
+-----------------+----+------------+
|    Alvaro Monroy|  75|           3|
|       Ana Robles|  41|           3|
|    Carmen Arauzo|  19|          19|
|     Celia Castro|  47|           0|
|     Clara Suarez|  21|          26|
|Cristian Cuadrado|  52|           5|
|     David Roldan|  74|           1|
|  Eduardo Redondo|  58|           7|
|   Fernanda Gomez|  78|          21|
|     Ines Barcero|  29|          13|
|      Jorge Recio|  18|          22|
|       Laura Luiz|  44|           7|
|   Luis Rodrigues|  20|          21|
|    Marta Rodrigo|  67|           2|
|    Melisa Aguado|  30|          20|
|      Pablo Lopez|  40|          11|
|      Pablo Perez|  26|          14|
|   Roberta Varado|  64|           1|
|  Roberto Salazar|  68|           3|
|     Romina Verde|  29|          23|
| Silvia Rodriguez|  49|           2|
|      Victor Ruiz|  53|           7|
+-----------------+----+------------+



**9. Queremos obtener un Coeficiente de Ponderación que nos permita evaluar a cada cliente en función de su consumo para identificar los clientes más atractivos que forman parte de nuestra compañía. ESTE CÁLCULO SÓLO SE REALIZARÁ PARA LOS CLIENTES QUE TENGAN UNA SOLA OFERTA CONTRATADA CON LA COMPAÑÍA. </br> Este coeficiente se obtendrá en base a los consumos diarios, y por tanto sólo se tendrán en cuenta los clientes que existen en el mes de Agosto, ya que no tenemos datos de consumo del mes de Julio. </br> Las ponderaciones que se darán a cada uno de los consumos son las siguientes:** 
- 0.4 --> llamadas desde telefóno móvil.
- 0.3 --> datos_móviles_MB
- 0.2 --> llamadas desde teléfono fijo
- 0.1 --> SMS enviados </br>

**Los pasos a seguir son los siguientes:**
  1. Obtener la suma de todos los días de cada uno de los 4 consumos para cada cliente.
  2. Obtener el máximo del cálculo anterior de todos los clientes para poder obtener un valor entre 0 y 1 para cada uno de los 4 consumos (recordar que sólo aplica para los clientes con UNA SOLA OFERTA CONTRATADA en el mes de Agosto. El cliente con mayor consumo de datos tendrá un valor de 1 en la columna 'datos_móviles_0_1')
  3. Multiplicar estas columnas obtenidas con valor entre 0 y 1 por su ponderación correspondiente (por ejemplo la columna "datos_moviles_0_1" se multiplicará por la ponderación de datos móviles 0.4).
  4. Por último se calculará la suma de las 4 ponderaciones obtenidas para calcular el Coeficiente de ponderación buscado (coeficiente_cliente), casteado a 3 decimales.
  5. Mostrar DF resultante ordenado por este coeficiente en sentido descendente, de manera que el primer registro corresponderá al cliente más atractivo. **El DF resultante tendrá 3 columnas: nombre, edad, coeficiente_cliente.**

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# En primer lugar, obtenemos el listado de clientes que sólo tienen contratada una oferta.
# Esto lo averiguamos en el df de facturas del mes actual, contando cuántas ofertas distintas tiene cada cliente

clientes_1oferta = df_facturas_mes_actual.groupBy("id_cliente").agg(countDistinct("id_oferta").alias("n_ofertas_distintas")) \
                                            .filter(col("n_ofertas_distintas") == 1).drop("n_ofertas_distintas")

# A continuación, hacemos inner join para quedarnos sólo con los clientes con 1 oferta, 
# y obtenemos la suma de todos los días de cada uno de los 4 consumos para cada cliente. 

df_resumen = df_consumos_diarios.join(clientes_1oferta, "id_cliente", "inner").groupBy("id_cliente").agg(sum("consumo_datos_MB").alias("datos_mensuales"), 
                                                                                            sum("sms_enviados").alias("sms_mensuales"),
                                                                                            sum("minutos_llamadas_movil").alias("minutos_movil_mensuales"),
                                                                                            sum("minutos_llamadas_fijo").alias("minutos_fijo_mensuales"))

# A partir de este df_resumen, nos sacamos un df_maximos donde guardaremos el consumo máximo para cada uno de los 4 productos, lo que nos servirá más tarde como base para calcular los coeficientes

df_maximos = df_resumen.agg(max("datos_mensuales"), max("sms_mensuales"), max("minutos_movil_mensuales"), max("minutos_fijo_mensuales"))

df_maximos.show()

+--------------------+------------------+----------------------------+---------------------------+
|max(datos_mensuales)|max(sms_mensuales)|max(minutos_movil_mensuales)|max(minutos_fijo_mensuales)|
+--------------------+------------------+----------------------------+---------------------------+
|               49512|               293|                        1921|                       1529|
+--------------------+------------------+----------------------------+---------------------------+



In [0]:
# En base a esos valores, ya podemos volver al df_resumen donde teníamos los consumos mensuales para cada cliente, 
# y obtener los coeficientes en base a estos consumos y a los valores de referencia de df_maximos, 
# ponderándolos también por los coeficientes 0.1-0.4 asignados a cada producto

df_resumen = df_resumen.withColumn("datos_0_1", col("datos_mensuales")/lit(df_maximos.collect()[0][0])*0.3) \
            .withColumn("sms_0_1", col("sms_mensuales")/lit(df_maximos.collect()[0][1])*0.1) \
            .withColumn("mins_movil_0_1", col("minutos_movil_mensuales")/lit(df_maximos.collect()[0][2])*0.4) \
            .withColumn("mins_fijo_0_1", col("minutos_fijo_mensuales")/lit(df_maximos.collect()[0][3])*0.2)

df_resumen.show()

+----------+---------------+-------------+-----------------------+----------------------+--------------------+--------------------+--------------------+--------------------+
|id_cliente|datos_mensuales|sms_mensuales|minutos_movil_mensuales|minutos_fijo_mensuales|           datos_0_1|             sms_0_1|      mins_movil_0_1|       mins_fijo_0_1|
+----------+---------------+-------------+-----------------------+----------------------+--------------------+--------------------+--------------------+--------------------+
|        29|          48922|           14|                   1340|                  1225|  0.2964251090644692|0.004778156996587031|  0.2790213430504946| 0.16023544800523218|
|        27|           6512|          233|                    570|                  1109| 0.03945710130877363|  0.0795221843003413| 0.11868818323789694| 0.14506213211249183|
|        28|          22768|          293|                    268|                   260| 0.13795443528841492|                 0.1

In [0]:
# Finalmente sumamos los coeficientes de cada producto para obtener el coeficiente total, los ordenamos por orden descendiente

df_resumen = df_resumen.withColumn("coeficiente_cliente", (col("datos_0_1") + col("sms_0_1") + col("mins_movil_0_1") + col("mins_fijo_0_1")).cast(DecimalType(scale=3))).select("id_cliente","coeficiente_cliente")

df_resumen = df_clientes.select("id_cliente", "nombre", "edad").join(df_resumen, "id_cliente", "inner").orderBy(desc("coeficiente_cliente")).drop("id_cliente")
df_resumen.show()

+--------------------+----+-------------------+
|              nombre|edad|coeficiente_cliente|
+--------------------+----+-------------------+
|        Romina Verde|  29|              0.740|
|       Fatima Cuevas|  29|              0.687|
|    Silvia Rodriguez|  49|              0.653|
|        Celia Castro|  47|              0.595|
|       Melisa Aguado|  30|              0.552|
|         Pedro Conde|  32|              0.517|
|       Carmen Arauzo|  19|              0.486|
|     Eduardo Redondo|  58|              0.440|
|         Pablo Perez|  26|              0.411|
|      Fernanda Gomez|  78|              0.410|
|Juan Carlos Iglesias|  38|              0.385|
|      Roberta Varado|  64|              0.383|
|        David Roldan|  74|              0.375|
|         Jorge Recio|  18|              0.356|
|        Walter Ramos|  54|              0.328|
|     Roberto Salazar|  68|              0.310|
|      Luis Rodrigues|  20|              0.309|
|         Pablo Lopez|  40|             

**10. Queremos averiguar la fecha en la que entre los 3 clientes que más datos consumen de cada grupo_edad llegan a un consumo de 20 GB de datos móviles. En caso de que algún grupo_edad no llegue entre los 3 clientes a 20 GB en todo el mes, se asignará "null" como valor de esta columna (Recordar que 1 GB = 1024 MB). </br> Obtener un DF que contiene 4 registros, uno para cada grupo_edad y 3 columnas: grupo_edad, fecha_20_GB, datos_moviles_total_grupo_3_clientes.** (datos_moviles_total_grupo_3_clientes representa el total de datos consumidos en MB por los 3 clientes del grupo hasta final de mes)

In [0]:
# Reseteamos tablas
df_clientes = spark.read.table("df_clientes")
df_ofertas = spark.read.table("df_ofertas")
df_facturas_mes_actual = spark.read.table("df_facturas_mes_actual")
df_facturas_mes_ant = spark.read.table("df_facturas_mes_ant")
df_consumos_diarios = spark.read.table("df_consumos_diarios")

In [0]:
# Copiamos el código de apartados anteriores para generar una columna grupo_edad en el df de clientes
df_join = df_clientes.withColumn("grupo_edad", when(col("edad") < 26, lit(1))
                                         .when(col("edad").between(26,40),lit(2))
                                         .when(col("edad").between(41,65),lit(3))
                                         .otherwise(lit(4)))

# Y asignamos este grupo de edad a cada cliente dentro del df de consumos
df_resumen = df_consumos_diarios.join(df_join.select("id_cliente","grupo_edad"),["id_cliente"],"left").orderBy("id_cliente")

# A continuación, vamos a obtener los 3 clientes de cada grupo que más datos han consumido.
window_grupo = Window.partitionBy("grupo_edad").orderBy(asc("grupo_edad"), desc("sum(consumo_datos_MB)"))

# Con los siguientes 3 métodos: 1. Agrupamos los clientes por edad e id, para cada uno de ellos vemos el consumo total de datos ese mes
# 2. Usamos operación window para, dentro de cada grupo edad, hacer un ranking por consumo de datos (descendente)
# 3. Nos quedamos sólo con los clientes que estén entre las 3 primeras posiciones
df_top_consumo = df_resumen.groupBy("grupo_edad","id_cliente").agg(sum("consumo_datos_MB")) \
                                            .withColumn("ranking_consumo", row_number().over(window_grupo)) \
                                            .filter(col("ranking_consumo") <= 3) 

df_top_consumo.show()

# Nos quedamos sólo con el listado de id_clientes
df_top_consumo = df_top_consumo.select("id_cliente")

+----------+----------+---------------------+---------------+
|grupo_edad|id_cliente|sum(consumo_datos_MB)|ranking_consumo|
+----------+----------+---------------------+---------------+
|         1|        22|                50398|              1|
|         1|        17|                49512|              2|
|         1|        11|                39777|              3|
|         2|        29|                48922|              1|
|         2|        21|                45384|              2|
|         2|        25|                41831|              3|
|         3|        28|                22768|              1|
|         3|        15|                20359|              2|
|         3|        13|                19973|              3|
|         4|        30|                 9019|              1|
|         4|         3|                 5474|              2|
|         4|        23|                 4502|              3|
+----------+----------+---------------------+---------------+



In [0]:


# En esta celda de código, una vez sabemos sobre qué clientes tenemos que operar, obtendremos las fechas en las que se sobrepasan los 20 GB de datos, y el consumo total de estos 3 clientes.

window_grupo = Window.partitionBy("grupo_edad").orderBy(asc("grupo_edad"), asc("fecha"))

# Con los siguientes métodos:
# 1. Con el join nos quedamos sólo con los clientes top 3 de cada grupo, con groupby y agg calculamos el consumo conjunto de este top 3 cada día del mes
# 2. Nueva columna donde se vaya calculando el consumo en lo que va de mes
# 3. Nueva columna con el consumo total para todo el mes (siempre refiriéndonos a la agrupación por grupo edad y top3 clientes)
# 4. Nueva columna 0/1 indicando si el consumo en lo que va de mes sobrepasa 20 GB o no
# 5. Nueva columna para registrar el día exacto en el que se sobrepasan los 20 GB
# 6. Nueva columna indicando el día del mes
df_resumen = df_resumen.join(df_top_consumo, "id_cliente", "inner").groupBy("grupo_edad","fecha").agg(sum("consumo_datos_MB").alias("consumo_diario")) \
                                                        .withColumn("acumulado_mes", sum("consumo_diario").over(window_grupo.rowsBetween(Window.unboundedPreceding,Window.currentRow))) \
                                                        .withColumn("total_mes", sum("consumo_diario").over(window_grupo.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))) \
                                                        .withColumn("no_sobrepasa_20GB", when(col("acumulado_mes") < 20*1024, lit(1)).otherwise(lit(0))) \
                                                        .withColumn("dia_20GB", (sum("no_sobrepasa_20GB").over(window_grupo.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)))+lit(1)) \
                                                        .withColumn("dia_mes", row_number().over(window_grupo))

df_resumen.show(40)

+----------+----------+--------------+-------------+---------+-----------------+--------+-------+
|grupo_edad|     fecha|consumo_diario|acumulado_mes|total_mes|no_sobrepasa_20GB|dia_20GB|dia_mes|
+----------+----------+--------------+-------------+---------+-----------------+--------+-------+
|         1|2020-08-01|          6185|         6185|   139687|                1|       4|      1|
|         1|2020-08-02|          5854|        12039|   139687|                1|       4|      2|
|         1|2020-08-03|          4018|        16057|   139687|                1|       4|      3|
|         1|2020-08-04|          4864|        20921|   139687|                0|       4|      4|
|         1|2020-08-05|          5526|        26447|   139687|                0|       4|      5|
|         1|2020-08-06|          4818|        31265|   139687|                0|       4|      6|
|         1|2020-08-07|          5359|        36624|   139687|                0|       4|      7|
|         1|2020-08-

In [0]:
# Con esos datos calculados, llegamos al df que se nos pide el enunciado
# Nueva columna para quedarnos sólo con los 4 registros que nos interesan:
    # condición when para quedarnos con la fecha del mes en la que se sobrepasan los 20 GB
    # condición when para controlar aquellos grupos de edad que no sobrepasen los 20 GB, nos quedamos con el registro del día 31
    # otherwise, restos de registros los marcamos con el flag "descartar"
# Filtramos conforme a los valores de esta columna recién generada para quedarnos con los registros que nos interesan
# Seleccionamos las columnas que nos pide el enunciado

df_resumen = df_resumen.withColumn("fecha_20GB", when(col("dia_20GB") == col("dia_mes"), col("fecha"))
                                                 .when((col("dia_20GB") == 32) & (col("dia_mes") == 31), None)
                                                 .otherwise(lit("descartar"))) \
                        .filter(col("fecha_20GB").isNull() | (col("fecha_20GB") != "descartar")) \
                        .select("grupo_edad", "fecha_20GB", "total_mes") 

df_resumen.show()

+----------+----------+---------+
|grupo_edad|fecha_20GB|total_mes|
+----------+----------+---------+
|         1|2020-08-04|   139687|
|         2|2020-08-05|   136137|
|         3|2020-08-10|    63100|
|         4|      null|    18995|
+----------+----------+---------+

