In [1]:
import pyspark

In [2]:
from pyspark.sql.functions import lit

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import explode

spark = SparkSession.builder.getOrCreate()


In [4]:
sc = spark.sparkContext

<h2> Leitura dos arquivos JSON </h2>
<p> Os dados das tabelas foram todos armazenados em dois arquivos JSON. Um json que representa os dados da tabela Transações e outro para representar os dados da tabela contratos

In [48]:
df_transactions = spark.read.option("multiline", "true").json("pagoNxt_transactions.json")

In [49]:
df_transactions.show()

+---------+-------------------+------------+--------------+
|client_id|discount_percentage|total_amount|transaction_id|
+---------+-------------------+------------+--------------+
|     3545|               6.99|        3000|             1|
|     3545|               0.45|        4500|             2|
|     3509|                0.0|       69998|             3|
|     3510|               null|           1|             4|
|     4510|               40.0|          34|             5|
+---------+-------------------+------------+--------------+



In [50]:
df_transactions = df_transactions.withColumnRenamed('client_id', 'transactions_client_id')

In [51]:
df_transactions.show()

+----------------------+-------------------+------------+--------------+
|transactions_client_id|discount_percentage|total_amount|transaction_id|
+----------------------+-------------------+------------+--------------+
|                  3545|               6.99|        3000|             1|
|                  3545|               0.45|        4500|             2|
|                  3509|                0.0|       69998|             3|
|                  3510|               null|           1|             4|
|                  4510|               40.0|          34|             5|
+----------------------+-------------------+------------+--------------+



In [52]:
df_contracts = spark.read.option("multiline", "true").json("pagoNxt_contracts.json")

In [53]:
df_contracts.show()

+---------+---------------+-----------+---------+----------+
|client_id|    client_name|contract_id|is_active|percentage|
+---------+---------------+-----------+---------+----------+
|     3545| Magazine Luana|          3|     true|       2.0|
|     3545| Magazine Luana|          4|    false|      1.95|
|     3509|Lojas Italianas|          5|     true|       1.0|
|     3510|     Carrerfive|          6|     true|       3.0|
+---------+---------------+-----------+---------+----------+



In [54]:
df_contracts_transactions = df_transactions.join(df_contracts, df_transactions.transactions_client_id == df_contracts.client_id, "left")

In [55]:
df_contracts_transactions = df_contracts_transactions.na.fill(value=0, subset='discount_percentage')

In [56]:
df_contracts_transactions.show()

+----------------------+-------------------+------------+--------------+---------+---------------+-----------+---------+----------+
|transactions_client_id|discount_percentage|total_amount|transaction_id|client_id|    client_name|contract_id|is_active|percentage|
+----------------------+-------------------+------------+--------------+---------+---------------+-----------+---------+----------+
|                  3545|               6.99|        3000|             1|     3545| Magazine Luana|          4|    false|      1.95|
|                  3545|               6.99|        3000|             1|     3545| Magazine Luana|          3|     true|       2.0|
|                  3545|               0.45|        4500|             2|     3545| Magazine Luana|          4|    false|      1.95|
|                  3545|               0.45|        4500|             2|     3545| Magazine Luana|          3|     true|       2.0|
|                  3509|                0.0|       69998|             3|    

In [57]:
df_contracts_transactions = df_contracts_transactions[df_contracts_transactions.is_active == "true"]

In [58]:
df_contracts_transactions_valid_columns = df_contracts_transactions.select("discount_percentage", "total_amount", "transaction_id", "client_id",    "client_name", "contract_id", "percentage")

In [59]:
df_contracts_transactions_valid_columns.show()

+-------------------+------------+--------------+---------+---------------+-----------+----------+
|discount_percentage|total_amount|transaction_id|client_id|    client_name|contract_id|percentage|
+-------------------+------------+--------------+---------+---------------+-----------+----------+
|               6.99|        3000|             1|     3545| Magazine Luana|          3|       2.0|
|               0.45|        4500|             2|     3545| Magazine Luana|          3|       2.0|
|                0.0|       69998|             3|     3509|Lojas Italianas|          5|       1.0|
|                0.0|           1|             4|     3510|     Carrerfive|          6|       3.0|
+-------------------+------------+--------------+---------+---------------+-----------+----------+



In [60]:
df_contracts_transactions_valid_columns = df_contracts_transactions_valid_columns.withColumn("net_value", lit(df_contracts_transactions['total_amount'] - ((df_contracts_transactions['total_amount'] * df_contracts_transactions['discount_percentage'])/100 )))

In [61]:
df_contracts_transactions_valid_columns = df_contracts_transactions_valid_columns.withColumn("discount_percentage", lit((df_contracts_transactions_valid_columns['net_value'] * df_contracts_transactions_valid_columns['percentage'])/100 ))

In [62]:
df_contracts_transactions_valid_columns = df_contracts_transactions_valid_columns.withColumn("total_gain", lit(df_contracts_transactions_valid_columns['net_value'] * df_contracts_transactions_valid_columns['discount_percentage']))

In [63]:
df_contracts_transactions_valid_columns.show()

+-------------------+------------+--------------+---------+---------------+-----------+----------+---------+-------------+
|discount_percentage|total_amount|transaction_id|client_id|    client_name|contract_id|percentage|net_value|   total_gain|
+-------------------+------------+--------------+---------+---------------+-----------+----------+---------+-------------+
| 55.806000000000004|        3000|             1|     3545| Magazine Luana|          3|       2.0|   2790.3|  155715.4818|
|             89.595|        4500|             2|     3545| Magazine Luana|          3|       2.0|  4479.75| 401363.20125|
|             699.98|       69998|             3|     3509|Lojas Italianas|          5|       1.0|  69998.0|4.899720004E7|
|               0.03|           1|             4|     3510|     Carrerfive|          6|       3.0|      1.0|         0.03|
+-------------------+------------+--------------+---------+---------------+-----------+----------+---------+-------------+



<p>Obtendo o ganho total da empresa Acquirer LTDA </p>

In [65]:
df_contracts_transactions_valid_columns.agg({'discount_percentage': 'sum'}).show(truncate=False)

+------------------------+
|sum(discount_percentage)|
+------------------------+
|845.4110000000001       |
+------------------------+



In [67]:
df_contracts_transactions_valid_columns.groupBy("client_name").sum("discount_percentage").show(truncate=False)

+---------------+------------------------+
|client_name    |sum(discount_percentage)|
+---------------+------------------------+
|Carrerfive     |0.03                    |
|Lojas Italianas|699.98                  |
|Magazine Luana |145.401                 |
+---------------+------------------------+

