# **Reto II**

### 1. Dataset

Los datos de origen son proporcionados en un archivos csv:

* udfs: dataset con datos de operaciones financieras.

### 2. Columnas y significado:

* nb: número de referencia de la operación.
* contract: identificador de contrato.
* udf_ref: identificador de operación de trading.
* fmly: familia a la que pertenece la operación financiera.
* grp: grupo al que pertenece la operación financiera.
* type: tipo de operación financiera.
* country: país de origen de la operación.
* udf_name: campo informado en el registro.
* num_value: valor numérico.
* string_value: valor de cadena de caracteres.
* date_value: valor de fecha.
* data_timestamp_part: marca temporal.
* data_date_part: fecha en la que se almacena la información.
* source_system: fuente de los datos.

### 3. Descripción del problema:

Si hacemos una visión general a nuestro conjunto de datos, podemos observar como hay hasta 10 registros (filas) para cada valor de *nb*, donde cada registro solo da información para un valor de *udf_name*. Esto es un gasto innecesario de almacenamiento y computación, además de complicar los futuros cálculos derivados de estos datos. Por esta razón, necesitamos convertir estos registros con el mismo *nb* a un solo registro.

Nuestro dataframe final tendrá que contener las siguientes columnas: `nb, M_CCY, M_CLIENT, M_CRDTCHRG, M_DIRECTIAV, M_DISCMARGIN, M_LIQDTYCHRG, M_MVA, M_RVA, M_SELLER, M_SUCURSAL`

* nb: debe contener el número de referencia de la operación.
* M_CLIENT, M_SELLER, M_CCY, M_SUCURSAL: deben mapear el valor de *string_value*
* M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA: deben mapear el valor de *num_value*


Una vez tengamos este resultado, necesitaremos eliminar las operaciones que no tengan informados ninguno de los siguientes campos:

M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA, M_SELLER

No informados en este caso significa que o son valores nulos, vacíos o 0, en el caso de los campos numéricos.

### 4. Reto:

* Obtener un dataframe final que contenga las columnas indicadas, con un registro por *nb* y con los valores correctos mapeados.
* Las operaciones con los campos M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, , M_MVA, M_RVA, M_SELLER no informados no deben existir.
* Hacerlo de la manera más eficiente posible a nivel computacional.

**NOTA:** Cada uno de los pasos descritos en el problema pueden efectuarse en una sola línea.

### Inicialización de SparkSession:

In [1]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://L2108019.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1636015064576)
SparkSession available as 'spark'


import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import spark.implicits._


In [2]:
val spark = SparkSession.builder()
                        .appName("Reto 2")
                        .master("local")
                        .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@279df0b8


In [21]:
val udfs = spark.read.format("csv")
                     .option("header", "true")
                     .option("delimiter", ";")
                     .option("treatEmptyValuesAsNulls","true")
                     .option("nullValue", null)
                     .load("Desktop/Big Data/Retos Big Data/Spark for Data Engineers 2/reto2/udfs.csv")

udfs: org.apache.spark.sql.DataFrame = [nb: string, contract: string ... 12 more fields]


In [30]:
var aux1 = List("M_CLIENT", "M_SELLER", "M_CCY", "M_SUCURSAL")
var aux2 = List("M_DISCMARGIN", "M_DIRECTIAV", "M_LIQDTYCHRG", "M_CRDTCHRG", "M_MVA", "M_RVA")
var aux3 = aux1.++(aux2)

aux1: List[String] = List(M_CLIENT, M_SELLER, M_CCY, M_SUCURSAL)
aux2: List[String] = List(M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA)
aux3: List[String] = List(M_CLIENT, M_SELLER, M_CCY, M_SUCURSAL, M_DISCMARGIN, M_DIRECTIAV, M_LIQDTYCHRG, M_CRDTCHRG, M_MVA, M_RVA)


In [19]:
var data = udfs.where(col("udf_name").isin(aux3: _*)) // udfs.count() -> 386, data.count() -> 385
data = data.select("nb", "udf_name", "num_value", "string_value")

var pivot = data.withColumn("num_value", col("num_value").cast(IntegerType)).groupBy("nb")
                .pivot("udf_name")
                .agg(first("num_value"), first("string_value"))

pivot = pivot.drop("udf_name", "M_CLIENT_first(num_value)", "M_CCY_first(num_value)", "M_SUCURSAL_first(num_value)", 
                   "M_DISCMARGIN_first(string_value)", "M_DIRECTIAV_first(string_value)", "M_LIQDTYCHRG_first(string_value)", 
                   "M_MVA_first(string_value)", "M_RVA_first(string_value)", "M_SELLER_first(num_value)", "M_CRDTCHRG_first(string_value)")

pivot = pivot.withColumnRenamed("M_CCY_first(string_value)", "M_CCY")
             .withColumnRenamed("M_CLIENT_first(string_value)", "M_CLIENT")
             .withColumnRenamed("M_CRDTCHRG_first(num_value)", "M_CRDTCHRG")
             .withColumnRenamed("M_DIRECTIAV_first(num_value)", "M_DIRECTIAV")
             .withColumnRenamed("M_DISCMARGIN_first(num_value)", "M_DISCMARGIN")
             .withColumnRenamed("M_LIQDTYCHRG_first(num_value)", "M_LIQDTYCHRG")
             .withColumnRenamed("M_MVA_first(num_value)", "M_MVA")
             .withColumnRenamed("M_RVA_first(num_value)", "M_RVA")
             .withColumnRenamed("M_SELLER_first(string_value)", "M_SELLER")
             .withColumnRenamed("M_SUCURSAL_first(string_value)", "M_SUCURSAL")

val resultado = pivot.where((col("M_SELLER").isNotNull && upper(col("M_SELLER")) =!= "NULL") || 
                            (col("M_DISCMARGIN").isNotNull && upper(col("M_DISCMARGIN")) =!= "NULL" && col("M_DISCMARGIN") =!= 0) || 
                            (col("M_DIRECTIAV").isNotNull && col("M_DIRECTIAV") =!= 0 && upper(col("M_DIRECTIAV")) =!= "NULL") ||
                            (col("M_LIQDTYCHRG").isNotNull && col("M_LIQDTYCHRG") =!= 0 && upper(col("M_LIQDTYCHRG")) =!= "NULL") ||
                            (col("M_CRDTCHRG").isNotNull && col("M_CRDTCHRG") =!= 0 && upper(col("M_CRDTCHRG")) =!= "NULL") ||
                            (col("M_MVA").isNotNull && col("M_MVA") =!= 0 && upper(col("M_MVA")) =!= "NULL") ||
                            (col("M_RVA").isNotNull && col("M_RVA") =!= 0 && upper(col("M_RVA")) =!= "NULL"))

60
+--------+-----+--------+----------+-----------+------------+------------+-----+-----+--------+----------+
|      nb|M_CCY|M_CLIENT|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|M_MVA|M_RVA|M_SELLER|M_SUCURSAL|
+--------+-----+--------+----------+-----------+------------+------------+-----+-----+--------+----------+
|10000001| null|    NULL|        20|          0|          10|          30|    0|    0| SELLER1|      1999|
|10000009| null|    NULL|        20|          0|          10|          30|    0|    0| SELLER9|      1999|
|11122...| null|    MMMM|         0|       9056|           0|           0|   20| 8956|  VVVVVV|      1212|
|14773283| null|    NULL|        10|         10|         200|          10|   10|    5|    null|      5493|
|16719306|  USD|    null|      null|          0|        null|        null| null| null|    AMAM|      null|
|18343978|  GBP|    CCMO|         0|          0|          10|           0|   20|    0|LB_TL...|      1999|
|18710605|  MXN|    null|      nul

data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [nb: string, udf_name: string ... 2 more fields]
data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [nb: string, udf_name: string ... 2 more fields]
pivot: org.apache.spark.sql.DataFrame = [nb: string, M_CCY: string ... 9 more fields]
pivot: org.apache.spark.sql.DataFrame = [nb: string, M_CCY: string ... 9 more fields]
pivot: org.apache.spark.sql.DataFrame = [nb: string, M_CCY: string ... 9 more fields]
resultado: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [nb: string, M_CCY: string ... 9 more fields]


In [33]:
//Quicker way to do it

var pivot = udfs.where(col("udf_name").isin(aux3: _*))
                .groupBy("nb")
                .pivot("udf_name")
                .agg(first(when($"udf_name".isin(aux2: _*), $"num_value").otherwise($"string_value")))

val resultado = pivot.where((col("M_SELLER").isNotNull && upper(col("M_SELLER")) =!= "NULL") || 
                            (col("M_DISCMARGIN").isNotNull && upper(col("M_DISCMARGIN")) =!= "NULL" && col("M_DISCMARGIN") =!= 0) || 
                            (col("M_DIRECTIAV").isNotNull && col("M_DIRECTIAV") =!= 0 && upper(col("M_DIRECTIAV")) =!= "NULL") ||
                            (col("M_LIQDTYCHRG").isNotNull && col("M_LIQDTYCHRG") =!= 0 && upper(col("M_LIQDTYCHRG")) =!= "NULL") ||
                            (col("M_CRDTCHRG").isNotNull && col("M_CRDTCHRG") =!= 0 && upper(col("M_CRDTCHRG")) =!= "NULL") ||
                            (col("M_MVA").isNotNull && col("M_MVA") =!= 0 && upper(col("M_MVA")) =!= "NULL") ||
                            (col("M_RVA").isNotNull && col("M_RVA") =!= 0 && upper(col("M_RVA")) =!= "NULL"))

println(resultado.count)
resultado.orderBy(asc("nb")).show(61,8)

60
+--------+-----+--------+----------+-----------+------------+------------+--------+--------+--------+----------+
|      nb|M_CCY|M_CLIENT|M_CRDTCHRG|M_DIRECTIAV|M_DISCMARGIN|M_LIQDTYCHRG|   M_MVA|   M_RVA|M_SELLER|M_SUCURSAL|
+--------+-----+--------+----------+-----------+------------+------------+--------+--------+--------+----------+
|10000001| null|    NULL|        20|   0.000...|          10|          30|0.000...|0.000...| SELLER1|      1999|
|10000009| null|    NULL|        20|   0.000...|          10|          30|0.000...|0.000...| SELLER9|      1999|
|11122...| null|    MMMM|  0.000...|       9056|    0.000...|    0.000...|      20|    8956|  VVVVVV|      1212|
|14773283| null|    NULL|        10|         10|         200|          10|      10|       5|    null|      5493|
|16719306|  USD|    null|      null|   0.000...|        null|        null|    null|    null|    AMAM|      null|
|18343978|  GBP|    CCMO|  0.000...|   0.000...|          10|    0.000...|      20|0.000...|L

pivot: org.apache.spark.sql.DataFrame = [nb: string, M_CCY: string ... 9 more fields]
resultado: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [nb: string, M_CCY: string ... 9 more fields]


### Resultado:

**INSTRUCCIONES**: El DataFrame resultante debe almacenarse en la variable `resultado`, sustituyendo el valor `None` por el código que consideréis oportuno. De esta forma podréis comprobar si el resultado es correcto.

In [35]:
//val resultado = None

In [34]:
assert(resultado.count() == 60)
assert(resultado.columns.size == 11)
assert(resultado.columns(4) == "M_DIRECTIAV")
assert(resultado.select("M_SELLER").filter($"nb" === 23037162).first.getString(0) == "AMAM")
assert(resultado.select("M_SELLER").filter($"nb" === 19665186).first.getString(0) == "LB_VSTAVRE")
assert(resultado.select("M_RVA").filter($"nb" === 444111222).first.getString(0) == "8956")