# **Reto II**

### 1. Dataset

Los datos de origen son proporcionados en un archivos csv:

* udfs: dataset con datos de operaciones financieras.

### 2. Columnas y significado:

* nb: número de referencia de la operación.
* contract: identificador de contrato.
* udf_ref: identificador de operación de trading.
* fmly: familia a la que pertenece la operación financiera.
* grp: grupo al que pertenece la operación financiera.
* type: tipo de operación financiera.
* country: país de origen de la operación.
* udf_name: campo informado en el registro.
* num_value: valor numérico.
* string_value: valor de cadena de caracteres.
* date_value: valor de fecha.
* data_timestamp_part: marca temporal.
* data_date_part: fecha en la que se almacena la información.
* source_system: fuente de los datos.

### 3. Descripción del problema:

Si hacemos una visión general a nuestro conjunto de datos, podemos observar como hay hasta 10 registros (filas) para cada valor de *nb*, donde cada registro solo da información para un valor de *udf_name*. Esto es un gasto innecesario de almacenamiento y computación, además de complicar los futuros cálculos derivados de estos datos. Por esta razón, necesitamos convertir estos registros con el mismo *nb* a un solo registro.

Nuestro dataframe final tendrá que contener las siguientes columnas: `nb, M_CCY, M_CLIENT, M_CRDTCHRG, M_DIRECTIAV, M_DISCMARGIN, M_LIQDTYCHRG, M_MVA, M_RVA, M_SELLER, M_SUCURSAL`

* nb: debe contener el número de referencia de la operación.
* M_CLIENT, M_SELLER, M_CCY, M_SUCURSAL: deben mapear el valor de *string_value*
* M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, , M_MVA, M_RVA: deben mapear el valor de *num_value*


Una vez tengamos este resultado, necesitaremos eliminar las operaciones que no tengan informados ninguno de los siguientes campos:

M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA, M_SELLER

No informados en este caso significa que o son valores nulos, vacíos o 0, en el caso de los campos numéricos.

### 4. Reto:

* Obtener un dataframe final que contenga las columnas indicadas, con un registro por *nb* y con los valores correctos mapeados.
* Las operaciones con los campos M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, , M_MVA, M_RVA, M_SELLER no informados no deben existir.
* Hacerlo de la manera más eficiente posible a nivel computacional.

**NOTA:** Cada uno de los pasos descritos en el problema pueden efectuarse en una sola línea.

### Inicialización de SparkSession:

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

spark = (SparkSession.builder.appName("Reto 2").getOrCreate())

In [0]:
udfs = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ";")\
                      .csv("dbfs:/FileStore/shared_uploads/paula.roman@bosonit.com/udfs.csv")
udfs.show(100)

+--------+----------+--------+----+-----+-----+-------+------------+-----------------+------------+----------+-------------------+-------------------+-------------+
|      nb|  contract| udf_ref|fmly|  grp| type|country|    udf_name|        num_value|string_value|date_value|data_timestamp_part|     data_date_part|source_system|
+--------+----------+--------+----+-----+-----+-------+------------+-----------------+------------+----------+-------------------+-------------------+-------------+
|  444444|      3333|28786653| IRD|LN_BR| null|    ESP|       M_CCY|             NULL|        null|      NULL|     20201128041303|2020-12-30 00:00:00|        Mx3EU|
| 2222222|   2222222| 2222222| IRD|  IRS| null|    ESP|  M_CRDTCHRG|  30.000000000000|        NULL|      NULL|     20210203032054|2020-12-30 00:00:00|        Mx3EU|
| 2222222|   2222222| 2222222| IRD|  IRS| null|    ESP|    M_SELLER|             NULL|  LB_TLECLER|      NULL|     20210203032054|2020-12-30 00:00:00|        Mx3EU|
| 2222222|

### Resultado:

**INSTRUCCIONES**: El DataFrame resultante debe almacenarse en la variable `resultado`, sustituyendo el valor `None` por el código que consideréis oportuno. De esta forma podréis comprobar si el resultado es correcto.

In [0]:
#Seleccionamos las columnas que nos interesan
udfs_selec_df = udfs.select("nb", "udf_name", "num_value", "string_value").withColumn("num_value", F.col("num_value").cast("double"))
udfs_selec_df.show()

+-------+------------+---------+------------+
|     nb|    udf_name|num_value|string_value|
+-------+------------+---------+------------+
| 444444|       M_CCY|     null|        null|
|2222222|  M_CRDTCHRG|     30.0|        NULL|
|2222222|    M_SELLER|     null|  LB_TLECLER|
|2222222|M_LIQDTYCHRG|     50.0|        NULL|
|2222222|       M_MVA|     20.0|        NULL|
|2222222|  M_SUCURSAL|     null|        1999|
|2222222|       M_RVA|      0.0|        NULL|
|2222222| M_DIRECTIAV|      0.0|        NULL|
|2222222|    M_CLIENT|     null|        CCMO|
|2222222|M_DISCMARGIN|     10.0|        NULL|
|2222222|       M_CCY|     null|         USD|
|3815982|M_DISCMARGIN|      0.0|        NULL|
|3815982|  M_SUCURSAL|     null|        null|
|3815982|       M_RVA|      0.0|        NULL|
|3815982|M_LIQDTYCHRG|      0.0|        NULL|
|3815982|  M_CRDTCHRG|      0.0|        NULL|
|3815982|       M_MVA|      0.0|        NULL|
|3815982|    M_SELLER|     null|        null|
|3815982|    M_CLIENT|     null|  

In [0]:
#Pivotamos por la columna udf_name
udfs_num_df = udfs_selec_df.groupBy("nb").pivot("udf_name").agg(F.sum("num_value")).orderBy("nb")
udfs_str_df = udfs_selec_df.groupBy("nb").pivot("udf_name").agg(F.first("string_value")).orderBy("nb")
udfs_num_df.show()
udfs_str_df.show()

+--------+-----+--------+----------+-----------+------------+------------+-----+--------+-----+--------+----------+
|      nb|M_CCY|M_CLIENT|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|M_MVA|M_PRUEBA|M_RVA|M_SELLER|M_SUCURSAL|
+--------+-----+--------+----------+-----------+------------+------------+-----+--------+-----+--------+----------+
|  444444| null|    null|      null|       null|        null|        null| null|    null| null|    null|      null|
| 2222222| null|    null|      30.0|        0.0|        10.0|        50.0| 20.0|    null|  0.0|    null|      null|
| 3815982| null|    null|       0.0|        0.0|         0.0|         0.0|  0.0|    null|  0.0|    null|      null|
| 8216817| null|    null|      null|        0.0|        null|        null| null|    null| null|    null|      null|
|10000001| null|    null|      20.0|        0.0|        10.0|        30.0|  0.0|    null|  0.0|    null|      null|
|10000009| null|    null|      20.0|        0.0|        10.0|        30.

In [0]:
#Nos quedamos solo con las columnas que nos interesan
udfs_num_clean = udfs_num_df.drop("M_CCY").drop("M_CLIENT").drop("M_PRUEBA").drop("M_SELLER").drop("M_SUCURSAL")
udfs_str_clean = udfs_str_df.drop("M_CRDTCHRG").drop("M_DIRECTIAV").drop("M_DISCMARGIN").drop("M_LIQDTYCHRG").drop("M_MVA").drop("M_RVA").drop("M_PRUEBA")
udfs_num_clean.show()
udfs_str_clean.show()

+--------+----------+-----------+------------+------------+-----+-----+
|      nb|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|M_MVA|M_RVA|
+--------+----------+-----------+------------+------------+-----+-----+
|  444444|      null|       null|        null|        null| null| null|
| 2222222|      30.0|        0.0|        10.0|        50.0| 20.0|  0.0|
| 3815982|       0.0|        0.0|         0.0|         0.0|  0.0|  0.0|
| 8216817|      null|        0.0|        null|        null| null| null|
|10000001|      20.0|        0.0|        10.0|        30.0|  0.0|  0.0|
|10000009|      20.0|        0.0|        10.0|        30.0|  0.0|  0.0|
|14773283|      10.0|       10.0|       200.0|        10.0| 10.0|  5.0|
|16719306|      null|        0.0|        null|        null| null| null|
|18343978|       0.0|        0.0|        10.0|         0.0| 20.0|  0.0|
|18710605|      null|        0.0|        null|        null| null| null|
|18710606|      null|        0.0|        null|        null| null

In [0]:
#Unimos los dos dataframes
udfs_joined_df = udfs_num_clean.join(udfs_str_clean, on = ["nb"])
udfs_joined_df.show(150)

+-----------+----------+-----------+------------+------------+-----+------+-----+--------+----------+----------+
|         nb|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|M_MVA| M_RVA|M_CCY|M_CLIENT|  M_SELLER|M_SUCURSAL|
+-----------+----------+-----------+------------+------------+-----+------+-----+--------+----------+----------+
|     444444|      null|       null|        null|        null| null|  null| null|    null|      null|      null|
|    2222222|      30.0|        0.0|        10.0|        50.0| 20.0|   0.0|  USD|    CCMO|LB_TLECLER|      1999|
|    3815982|       0.0|        0.0|         0.0|         0.0|  0.0|   0.0| null|    null|      null|      null|
|    8216817|      null|        0.0|        null|        null| null|  null|  EUR|    null|      AMAM|      null|
|   10000001|      20.0|        0.0|        10.0|        30.0|  0.0|   0.0| null|    NULL|   SELLER1|      1999|
|   10000009|      20.0|        0.0|        10.0|        30.0|  0.0|   0.0| null|    NULL|   SEL

In [0]:
#Una vez tengamos este resultado, necesitaremos eliminar las operaciones que no tengan informados ninguno de los siguientes campos: M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA, M_SELLER
resultado = udfs_joined_df.filter((F.col("M_DISCMARGIN") != 0.0) | (F.col("M_DIRECTIAV") != 0.0)\
                                  | (F.col("M_LIQDTYCHRG") != 0.0) | (F.col("M_CRDTCHRG") != 0.0)\
                                  | (F.col("M_MVA") != 0.0) | (F.col("M_RVA") != 0.0) | (F.col("M_SELLER").isNotNull()))

resultado = resultado.select("nb", "M_CCY", "M_CLIENT", "M_CRDTCHRG", "M_DIRECTIAV", "M_DISCMARGIN", "M_LIQDTYCHRG", "M_MVA", "M_RVA", "M_SELLER", "M_SUCURSAL")
resultado.show()

+--------+-----+--------+----------+-----------+------------+------------+-----+-----+----------+----------+
|      nb|M_CCY|M_CLIENT|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|M_MVA|M_RVA|  M_SELLER|M_SUCURSAL|
+--------+-----+--------+----------+-----------+------------+------------+-----+-----+----------+----------+
| 2222222|  USD|    CCMO|      30.0|        0.0|        10.0|        50.0| 20.0|  0.0|LB_TLECLER|      1999|
| 8216817|  EUR|    null|      null|        0.0|        null|        null| null| null|      AMAM|      null|
|10000001| null|    NULL|      20.0|        0.0|        10.0|        30.0|  0.0|  0.0|   SELLER1|      1999|
|10000009| null|    NULL|      20.0|        0.0|        10.0|        30.0|  0.0|  0.0|   SELLER9|      1999|
|14773283| null|    NULL|      10.0|       10.0|       200.0|        10.0| 10.0|  5.0|      null|      5493|
|16719306|  USD|    null|      null|        0.0|        null|        null| null| null|      AMAM|      null|
|18343978|  GBP|   

In [0]:
assert(resultado.count() == 60)
assert(len(resultado.columns) == 11)
assert(resultado.columns[4] == "M_DIRECTIAV")
assert(resultado.select("M_SELLER").filter(F.col("nb") == 23037162).collect()[0].__getitem__('M_SELLER') == 'AMAM')
assert(resultado.select("M_SELLER").filter(F.col("nb") == 19665186).collect()[0].__getitem__('M_SELLER') == "LB_VSTAVRE")
assert(resultado.select("M_RVA").filter(F.col("nb") == 444111222).collect()[0].__getitem__('M_RVA') == 8956)