# Importamos el spark session y creamos nuestra app name

In [2]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("SparkByExamples").getOrCreate()

# Comandos utiles para programar en pandas

**filter()** se usa para devolver el marco de datos en función de la condición dada al eliminar las filas en el marco de datos o al extraer las filas o columnas particulares del marco de datos. Filtraremos el marco de datos en varias columnas. Puede tomar una condición y devolver el marco de datos.

Sintaxis:
filter (condición de dataframe.column)

**join()** es el proceso de tomar datos de varias tablas y colocarlos en una vista generada.

Sintaxis:
DataFrame().join(other, on=None, how=None)
otro: otro df
how: inner,cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti

**show()** es la operación para mostrar un dataframe

**printSchema()** es la operación para acceder al schema de cada dataFrame

# Tablas de datos de clientes activos
La tabla "t_active_customers" contiene información sobre el estatus del cliente dentro del banco

In [5]:
# Creamos la tabla
data = [("98735029", "A", "A"), ("92028383", "NA", ""), ("00012345", "A", ""), ("00735398", "A", "A")]
schema = ['customer_id', 'customer_status', 'debit_card_status']
t_active_customers = spark.createDataFrame(data, schema)

# Mostramos la tabla
t_active_customers.show()

+-----------+---------------+-----------------+
|customer_id|customer_status|debit_card_status|
+-----------+---------------+-----------------+
|   98735029|              A|                A|
|   92028383|             NA|                 |
|   00012345|              A|                 |
|   00735398|              A|                A|
+-----------+---------------+-----------------+



# Tablas de datos de tipo de producto
La tabla "t_product_bank" contiene información del tipo de producto con oferta en el sistema del banco

In [8]:
# Creamos la tabla
data = [("98735029", "adelanto_nomina"), ("92028383", "tdc_model"),\
         ("00112345", "credit_card"), ("00735398", "mortage_model")]
schema = ['customer_id', 'product_type']
t_product_bank = spark.createDataFrame(data, schema)

# Mostramos la tabla
t_product_bank.show()

+-----------+---------------+
|customer_id|   product_type|
+-----------+---------------+
|   98735029|adelanto_nomina|
|   92028383|      tdc_model|
|   00112345|    credit_card|
|   00735398|  mortage_model|
+-----------+---------------+



# Tablas de datos de tipo de producto y la respuesta de la campaña
La tabla "t_product_offer" contiene información del tipo de producto con oferta en el sistema del banco y la respuesta de la oferta del mismo, en caso de ser vacía no se ha enviado la campaña

In [11]:
# Creamos la tabla
data = [("98735029", "tdc_model", "yes"), ("92028383", "tdc_model", "yes"),\
         ("00112345", "credit_card", ""), ("00735398", "mortage_model", "")]
schema = ['customer_id', 'product_type', 'campaign_response']
t_product_offer = spark.createDataFrame(data, schema)

# Mostramos la tabla
t_product_offer.show()

+-----------+-------------+-----------------+
|customer_id| product_type|campaign_response|
+-----------+-------------+-----------------+
|   98735029|    tdc_model|              yes|
|   92028383|    tdc_model|              yes|
|   00112345|  credit_card|                 |
|   00735398|mortage_model|                 |
+-----------+-------------+-----------------+



# Solución

In [15]:
# Necesitamos los clientes sin respuesta en la campaña

In [12]:
t_product_offer_em = t_product_offer.filter(F.col('campaign_response') == "")

In [14]:
t_product_offer_em.show()

+-----------+-------------+-----------------+
|customer_id| product_type|campaign_response|
+-----------+-------------+-----------------+
|   00112345|  credit_card|                 |
|   00735398|mortage_model|                 |
+-----------+-------------+-----------------+



In [None]:
# Unimos ambos data frames en el cliente y producto

In [24]:
l_keys = ['customer_id','product_type']
t_product_customer = t_product_offer_em.join(t_product_bank, on = l_keys, how = "left")

In [25]:
t_product_customer.show()

+-----------+-------------+-----------------+
|customer_id| product_type|campaign_response|
+-----------+-------------+-----------------+
|   00112345|  credit_card|                 |
|   00735398|mortage_model|                 |
+-----------+-------------+-----------------+



In [None]:
# Obtenemos el dataframe de los clientes activos

In [17]:
t_active_customers_A = t_active_customers.filter(F.col("customer_status") == "A")

In [18]:
t_active_customers_A.show()

+-----------+---------------+-----------------+
|customer_id|customer_status|debit_card_status|
+-----------+---------------+-----------------+
|   98735029|              A|                A|
|   00012345|              A|                 |
|   00735398|              A|                A|
+-----------+---------------+-----------------+



In [None]:
# left join con clientes activos

In [26]:
final = t_active_customers_A.join(t_product_customer, on='customer_id', how='left')

In [31]:
final.select(F.col("customer_id"), F.col("product_type"))\
     .filter(F.col("product_type").isNotNull()).show()

+-----------+-------------+
|customer_id| product_type|
+-----------+-------------+
|   00735398|mortage_model|
+-----------+-------------+

