<a href="https://colab.research.google.com/github/laroccathebrux/DesafioEngenheiroDados/blob/main/Desafio_Engenheiro_de_Dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Desafio Engenheiro de Dados #

O desafio consiste em criar um snippet de código e responder uma pergunta sobre como escalar essa solução. A disponibilização do snippet e resposta da pergunta podem ser feitas via algum site que permita o compartilhamento de código, GitHub ou até mesmo pelo e-mail.

### Estratégia ###
Para realizar este desafio, separei as ações em 4 etapas:
- Salvar dados em arquivos Json
- Usando PySpark ler os dados dos arquivos Json e criar views que permitam o uso de linguagem SQL
- Fazer o cálculo em um único SQL criando um JOIN entre as duas views criadas
- Responder a pergunta final do desafio

# Etapa 1 #
---
Os arquivos foram salvos em formato Json com os seguintes nomes:
- contratos.json
- transacoes.json

# Etapa 2 #
---
Para que possamos utilizar o PySpark, primeiro precisamos instalar a biblioteca

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 72kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=60937dfa024c5d0096945a1d51726b6962ef15413a1caf6d2decb785557557eb
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


### Importação das bibliotecas ###
Agora que temos o PySpark instalado, precisamos importar as bibliotecas

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import DoubleType

import datetime
import numpy as np
import pandas as pd

### Criação do Objeto spark ###
Foi criado um objeto chamado spark que será usado por todo o código para realizar as ações do Apache Spark.
O nome do aplicativo escolhido foi 'Desafio Engenheiro de Dados'

In [60]:
spark = SparkSession \
    .builder \
    .appName("Desafio Engenheiro de Dados") \
    .getOrCreate()

### Importação dos arquivos Json ###
Ambos arquivos são importados para dois dataframes:
- df_t = lê os dados do arquivo transacoes.json
- dt_c = lê os dados do arquivo contratos.json

In [61]:
path = "./transacoes.json"
df_t = spark.read.json(path)
path = "./contratos.json"
df_c = spark.read.json(path)

### Validar Data Types e Null Values ###
Imprimindo o Schema de df_t e df_c para verificar os data types encontrados automaticamente pelo Spark

In [69]:
df_t.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- discount_percentage: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- transaction_id: long (nullable = true)



In [64]:
df_c.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- client_name: string (nullable = true)
 |-- contract_id: long (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- percentage: double (nullable = true)



Uma vez que temos apenas um valor null em nosso dataset e o mesmo pode ser tratado como um valor double 0, vamos substituí-lo de acordo.

In [65]:
df_t = df_t.fillna({'discount_percentage':'0'})

Vamos mudar o campo total_amount de long (int) para double uma vez que, apesar de possuir valor inteiros, ele é um campo de valor financeiro

In [66]:
df_t = df_t.withColumn("total_amount", df_t["total_amount"].cast(DoubleType()))

### Criar Views ###
Vamos criar duas views baseadas em ambos data frames criados anteriormente.

In [67]:
df_t.createOrReplaceTempView("transactions")
df_c.createOrReplaceTempView("contracts")

Verificando a criação da **view Transactions**

**ATENÇÃO**: Para datasets com mais dados, utilizar na query um LIMIT com valor, para evitar problemas de performance

In [68]:
spark.sql('''
          SELECT * 
          FROM transactions 
          '''
          ).show()

+---------+-------------------+------------+--------------+
|client_id|discount_percentage|total_amount|transaction_id|
+---------+-------------------+------------+--------------+
|     3545|               6.99|      3000.0|             1|
|     3545|               0.45|      4500.0|             2|
|     3509|                0.0|     69998.0|             3|
|     3510|                0.0|         1.0|             4|
|     4510|               40.0|        34.0|             5|
+---------+-------------------+------------+--------------+



Verificando a criação da **view Contracts**

**ATENÇÃO**: Para datasets com mais dados, utilizar na query um LIMIT com valor, para evitar problemas de performance

In [55]:
spark.sql('''
          SELECT * 
          FROM contracts 
          '''
          ).show()

+---------+---------------+-----------+---------+----------+
|client_id|    client_name|contract_id|is_active|percentage|
+---------+---------------+-----------+---------+----------+
|     3545| Magazine Luana|          3|     true|       2.0|
|     3545| Magazine Luana|          4|    false|      1.95|
|     3509|Lojas Italianas|          5|     true|       1.0|
|     3510|      Carrefive|          6|     true|       3.0|
+---------+---------------+-----------+---------+----------+



# Etapa 3 - Cálculo #
A estratégia para o cálculo foi somar os valores das views respeitando os seguintes critérios:
- valor de total_amount subtraído da porcentagem de desconto
- resultado anterior multiplicado pela porcentagem do contrato
- respeitando apenas os registros cujo campo is_active for igual a True
- Os registros de transactions que não existem em contracts, não entrarão no cálculo devido ao JOIN realizado

In [73]:
spark.sql('''
          SELECT SUM(((t.total_amount-(t.total_amount*discount_percentage/100))*c.percentage/100)) AS total_amount FROM contracts AS c
          INNER JOIN transactions as t ON t.client_id = c.client_id
          WHERE c.is_active=true
          '''
          ).show()

+-----------------+
|     total_amount|
+-----------------+
|845.4110000000001|
+-----------------+



# Etapa 4 - Resposta da Pergunta #

Além do código acima, considere que uma escala de ~200 milhões de transações por dia e que o cálculo deverá apresentar um resultado do valor total do mês. Descreva em até 500 palavras que tecnologias e arquitetura você usaria para escalar a solução acima.

Num caso de ~200 milhões de transações dia e sabendo que os dados capturados serão utilizados para outros fins além do cálculo apresentado, seria prudente utilizar alguma solução de streaming de dados.
Porém a pergunta deste desafio não especifica outras regras de negócio, portanto me basearei na ideia de que iremos apenas calcular o total_amount dos dados recebidos.

**Premissa**

São ~200 milhões de transações dia, portanto precisaremos fazer mais de um cálculo para evitar computar ~6 bilhões de dados mês. Sendo assim, devemos fazer o seguinte:
- Gravar um arquivo json com os registros em um storage (Azure Blob ou AWS S3), formando assim nosso data lake.
- Com o uso do Apache Spark e Python, vamos criar um cluster para poder processar os dados de modo distribuído (Azure Databricks ou AWS Redshift)
- Após o cálculo feito, salvar o registro em um Banco de dados. Neste caso não há necessidade de um banco de dados não relacional, podendo ser utilizado um Azure SQL Server ou AWS Postgres. Mas caso seja necessário o uso de um banco no-sql, então podemos utilizar o Azure CosmosDB ou AWS com Apache Cassandra.
- Criaremos um novo arquivo python que lerá os dados do banco de dados escolhido e somará os resultados obtidos
- Para que tudo isso faça sentido, criaremos 2 data pipelines no Apache Airflow, o primeiro rodando diariamente, fazendo o calculo e salvando no banco de dados. O segundo rodando mensalmente, chamando o segundo arquivo python que faz a soma dos registros guardados no banco.
- Para realizar os cálculos nos meses seguintes, podemos adicionar uma data nos registros guardados, ou podemos simplesmente fazer um drop das tabelas todos os meses.

**Ponderações**
- Sabemos que a utilização de JOIN entre tabelas é um recurso bastante útil, mas estremamente lento, portanto o código acima deverá ser adaptado para poder executar o cálculo de maneira mais eficiente. (um loop em ambos Data Frame seria suficiente para realizar o calculo de maneira satisfatória)
