# *Etapa 3: Engenharia de Features (transactions.json)*

Ao término do processamento, devemos ter as seguintes variáveis:

## transactions_received:
* ``client_id``: id do cliente  
* ``offer_id``:  id da oferta
* ``time_received``: tempo desde o começo do teste em dias (t=0)
* ``qtd_offer_received``: quantidade de ofertas recebidas

## transactions_viewed:
* ``client_id``: id do cliente  
* ``offer_id``:  id da oferta
* ``time_viewed``: tempo desde o começo do teste em dias (t=0)
* ``qtd_offer_viewed``: quantidade de ofertas visualizadas


## transactions_completed:
* ``client_id``: id do cliente  
* ``offer_id``:  id da oferta
* ``time_completed``: tempo desde o começo do teste em dias (t=0)
* ``qtd_offer_completed``: quantidade de ofertas completadas
* ``sum_reward``: desconto

## client_transactions:
* ``client_id``: id do cliente  
* ``time_transaction``: tempo desde o começo do teste em dias (t=0)
* ``qtd_transaction``: quantidade de transações
* ``total_amount``: valor da transação
* ``avg_amount``: valor médio do valor da transação

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkTest').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/11 00:22:49 WARN Utils: Your hostname, N0L144853, resolves to a loopback address: 127.0.1.1; using 192.168.68.107 instead (on interface wlp0s20f3)
25/08/11 00:22:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 00:22:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/11 00:22:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/08/11 00:22:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/08/11 00:22:49 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [2]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Read Files

In [3]:
path_json = "../data/raw/transactions.json"
output_path_client_transactions = "../data/trusted/client_transactions"
output_path_transactions_received = "../data/trusted/transactions_received"
output_path_transactions_viewed = "../data/trusted/transactions_viewed"
output_path_transactions_completed = "../data/trusted/transactions_completed"


schema = T.StructType([
    T.StructField("event", T.StringType(), True),
    T.StructField("account_id", T.StringType(), True),
    T.StructField("time_since_test_start", T.DoubleType(), True),
    T.StructField("value", T.MapType(T.StringType(), T.StringType()), True) 
])

df_transactions = spark.read.schema(schema).json(path_json)

df_transactions = df_transactions.withColumn("offer id", F.col("value").getItem("offer id")) \
                                 .withColumn("offer_id", F.col("value").getItem("offer_id")) \
                                 .withColumn("reward", F.col("value").getItem("reward").cast(T.DoubleType())) \
                                 .withColumn("amount", F.col("value").getItem("amount").cast(T.DoubleType())) \
                                 .select(F.col('event'), F.col('account_id'), F.col('time_since_test_start'), F.col('offer id'), F.col('offer_id'), F.col('reward'), F.col('amount'))

df_transactions.show(5, truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+
|event         |account_id                      |time_since_test_start|offer id                        |offer_id|reward|amount|
+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |9b98b8c7a33c4b65b9aebfe6a799e6d9|NULL    |NULL  |NULL  |
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |0b1e1539f2cc45b7b9fa7c272da2e1d7|NULL    |NULL  |NULL  |
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |2906b810c7d4411798c6938adc9daaa5|NULL    |NULL  |NULL  |
|offer received|8ec6ce2a7e7949b1bf142def7d0e0586|0.0                  |fafdcd668e3743c1bb461111dcafc2a4|NULL    |NULL  |NULL  |
|offer received|68617ca6246f4fbc85e91a2a49552598|0.0                  |4d5c57ea9a6940dd891ad53e9dbe8da0|

                                                                                

In [4]:
df_transactions.describe().show()

25/08/11 00:22:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+---------------+--------------------+---------------------+--------------------+--------------------+------------------+------------------+
|summary|          event|          account_id|time_since_test_start|            offer id|            offer_id|            reward|            amount|
+-------+---------------+--------------------+---------------------+--------------------+--------------------+------------------+------------------+
|  count|         306534|              306534|               306534|              134002|               33579|             33579|            138953|
|   mean|           NULL|2.565638242424101E31|   15.265955815668082|                NULL|                NULL| 4.904136513892611| 12.77735615639814|
| stddev|           NULL|                 0.0|    8.346929730115084|                NULL|                NULL|2.8866468823372804|30.250528632017126|
|    min|offer completed|0009655768c64bdeb...|                  0.0|0b1e1539f2cc45b7b...|0b1e1539f2cc45b7b

                                                                                

In [5]:
df_transactions.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_transactions.columns]).show()



+-----+----------+---------------------+--------+--------+------+------+
|event|account_id|time_since_test_start|offer id|offer_id|reward|amount|
+-----+----------+---------------------+--------+--------+------+------+
|    0|         0|                    0|  172532|  272955|272955|167581|
+-----+----------+---------------------+--------+--------+------+------+



                                                                                

## Considerações:
 - Juntar as colunas de offer_id, pois não existem registros onde os dois não são nulos
 - Verificar tipos de Eventos
 - Verificar Colunas Nulas


In [6]:
df_transactions.groupBy("event").count().show()

+---------------+------+
|          event| count|
+---------------+------+
|    transaction|138953|
| offer received| 76277|
|offer completed| 33579|
|   offer viewed| 57725|
+---------------+------+



# Data Clean

In [7]:
df_transactions.show(5, truncate=False)

+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+
|event         |account_id                      |time_since_test_start|offer id                        |offer_id|reward|amount|
+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |9b98b8c7a33c4b65b9aebfe6a799e6d9|NULL    |NULL  |NULL  |
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |0b1e1539f2cc45b7b9fa7c272da2e1d7|NULL    |NULL  |NULL  |
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |2906b810c7d4411798c6938adc9daaa5|NULL    |NULL  |NULL  |
|offer received|8ec6ce2a7e7949b1bf142def7d0e0586|0.0                  |fafdcd668e3743c1bb461111dcafc2a4|NULL    |NULL  |NULL  |
|offer received|68617ca6246f4fbc85e91a2a49552598|0.0                  |4d5c57ea9a6940dd891ad53e9dbe8da0|

##### Unificando offer_id

In [8]:
df_transactions = df_transactions.withColumn("offer_id_final",F.coalesce(F.col("offer_id"), F.col("offer id")))
df_transactions.show(5, truncate=False)

+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+--------------------------------+
|event         |account_id                      |time_since_test_start|offer id                        |offer_id|reward|amount|offer_id_final                  |
+--------------+--------------------------------+---------------------+--------------------------------+--------+------+------+--------------------------------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |9b98b8c7a33c4b65b9aebfe6a799e6d9|NULL    |NULL  |NULL  |9b98b8c7a33c4b65b9aebfe6a799e6d9|
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |0b1e1539f2cc45b7b9fa7c272da2e1d7|NULL    |NULL  |NULL  |0b1e1539f2cc45b7b9fa7c272da2e1d7|
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |2906b810c7d4411798c6938adc9daaa5|NULL    |NULL  |NULL  |2906b810c7d4411798c6938adc9daaa5|
|offer received|8ec6ce2a7e7949b1bf

In [9]:
df_transactions = df_transactions.drop("offer id", "offer_id")\
                                 .withColumnRenamed("offer_id_final", "offer_id")\
                                 .withColumnRenamed("account_id", "client_id")
df_transactions.show(5, truncate=False)

+--------------+--------------------------------+---------------------+------+------+--------------------------------+
|event         |client_id                       |time_since_test_start|reward|amount|offer_id                        |
+--------------+--------------------------------+---------------------+------+------+--------------------------------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |NULL  |NULL  |9b98b8c7a33c4b65b9aebfe6a799e6d9|
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |NULL  |NULL  |0b1e1539f2cc45b7b9fa7c272da2e1d7|
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |NULL  |NULL  |2906b810c7d4411798c6938adc9daaa5|
|offer received|8ec6ce2a7e7949b1bf142def7d0e0586|0.0                  |NULL  |NULL  |fafdcd668e3743c1bb461111dcafc2a4|
|offer received|68617ca6246f4fbc85e91a2a49552598|0.0                  |NULL  |NULL  |4d5c57ea9a6940dd891ad53e9dbe8da0|
+--------------+--------------------------------

                                                                                

##### Transformando colunas

In [10]:
df_transactions = df_transactions.fillna({"amount": 0})\
                                 .fillna({"reward": 0})
df_transactions.show(5, truncate=False)

+--------------+--------------------------------+---------------------+------+------+--------------------------------+
|event         |client_id                       |time_since_test_start|reward|amount|offer_id                        |
+--------------+--------------------------------+---------------------+------+------+--------------------------------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |0.0   |0.0   |9b98b8c7a33c4b65b9aebfe6a799e6d9|
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |0.0   |0.0   |0b1e1539f2cc45b7b9fa7c272da2e1d7|
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |0.0   |0.0   |2906b810c7d4411798c6938adc9daaa5|
|offer received|8ec6ce2a7e7949b1bf142def7d0e0586|0.0                  |0.0   |0.0   |fafdcd668e3743c1bb461111dcafc2a4|
|offer received|68617ca6246f4fbc85e91a2a49552598|0.0                  |0.0   |0.0   |4d5c57ea9a6940dd891ad53e9dbe8da0|
+--------------+--------------------------------

In [11]:
distinct_events = [ev.event for ev in df_transactions.select('event').distinct().collect()]
distinct_events

['transaction', 'offer received', 'offer completed', 'offer viewed']

In [12]:
for event in distinct_events:
    df_transactions = df_transactions.withColumn(event.replace(" ", "_"), F.when(F.col("event") == event, 1).otherwise(0))
df_transactions.show(5, truncate=False)

+--------------+--------------------------------+---------------------+------+------+--------------------------------+-----------+--------------+---------------+------------+
|event         |client_id                       |time_since_test_start|reward|amount|offer_id                        |transaction|offer_received|offer_completed|offer_viewed|
+--------------+--------------------------------+---------------------+------+------+--------------------------------+-----------+--------------+---------------+------------+
|offer received|78afa995795e4d85b5d9ceeca43f5fef|0.0                  |0.0   |0.0   |9b98b8c7a33c4b65b9aebfe6a799e6d9|0          |1             |0              |0           |
|offer received|a03223e636434f42ac4c3df47e8bac43|0.0                  |0.0   |0.0   |0b1e1539f2cc45b7b9fa7c272da2e1d7|0          |1             |0              |0           |
|offer received|e2127556f4f64592b11af22de27a7932|0.0                  |0.0   |0.0   |2906b810c7d4411798c6938adc9daaa5|0      

##### Gerando tabelas limpas para cada tipo de evento

In [13]:
df_transactions = df_transactions.select("client_id", 'offer_id', 'time_since_test_start', "reward", "amount", 'offer_received', 'offer_viewed', 'offer_completed', 'transaction')
df_transactions.show(5, truncate=False)

+--------------------------------+--------------------------------+---------------------+------+------+--------------+------------+---------------+-----------+
|client_id                       |offer_id                        |time_since_test_start|reward|amount|offer_received|offer_viewed|offer_completed|transaction|
+--------------------------------+--------------------------------+---------------------+------+------+--------------+------------+---------------+-----------+
|78afa995795e4d85b5d9ceeca43f5fef|9b98b8c7a33c4b65b9aebfe6a799e6d9|0.0                  |0.0   |0.0   |1             |0           |0              |0          |
|a03223e636434f42ac4c3df47e8bac43|0b1e1539f2cc45b7b9fa7c272da2e1d7|0.0                  |0.0   |0.0   |1             |0           |0              |0          |
|e2127556f4f64592b11af22de27a7932|2906b810c7d4411798c6938adc9daaa5|0.0                  |0.0   |0.0   |1             |0           |0              |0          |
|8ec6ce2a7e7949b1bf142def7d0e0586|fafdcd

                                                                                

In [14]:
df_offer_received = df_transactions.filter("offer_received=1")\
                                   .select('client_id', 'offer_id', F.col('time_since_test_start').alias('time_received'), 'offer_received')\
                                   .groupBy("client_id", "offer_id").agg(
                                                                        F.array_sort(F.collect_list("time_received")).alias("time_received"), 
                                                                        F.count(F.col('offer_received')).alias("qtd_offer_received"))\
                                   .orderBy("client_id", "offer_id")

df_offer_viewed = df_transactions.filter("offer_viewed=1")\
                                 .select('client_id', 'offer_id', F.col('time_since_test_start').alias('time_viewed'), 'offer_viewed') \
                                 .groupBy("client_id", "offer_id").agg(
                                                                        F.array_sort(F.collect_list("time_viewed")).alias("time_viewed"), 
                                                                        F.count(F.col('offer_viewed')).alias("qtd_offer_viewed"))\
                                  .orderBy("client_id", "offer_id")

df_offer_completed = df_transactions.filter("offer_completed=1")\
                                    .select('client_id', 'offer_id', F.col('time_since_test_start').alias('time_completed'), 'reward', 'offer_completed')\
                                    .groupBy("client_id", "offer_id").agg(
                                                                        F.array_sort(F.collect_list("time_completed")).alias("time_completed"), 
                                                                        F.count(F.col('offer_completed')).alias("qtd_offer_completed"), 
                                                                        F.sum(F.col('reward')).alias("sum_reward"))\
                                    .orderBy("client_id", "offer_id")

df_client_transactions = df_transactions.filter("transaction=1")\
                                        .select('client_id', 'amount', F.col('time_since_test_start').alias('time_transaction'), 'transaction')\
                                        .groupBy("client_id").agg(
                                                                F.array_sort(F.collect_list("time_transaction")).alias("time_transaction"), 
                                                                F.count(F.col('transaction')).alias("qtd_transaction"),
                                                                F.sum(F.col('amount')).alias('total_amount'),
                                                                F.avg(F.col('amount')).alias('avg_amount'))\
                                        .orderBy("client_id")

In [15]:
df_client_transactions.write.format("parquet").mode("overwrite").save(output_path_client_transactions)
df_offer_received.write.format("parquet").mode("overwrite").save(output_path_transactions_received)
df_offer_viewed.write.format("parquet").mode("overwrite").save(output_path_transactions_viewed)
df_offer_completed.write.format("parquet").mode("overwrite").save(output_path_transactions_completed)

                                                                                