# Desafio DBC
#### Desenvolvedor: Teddy Ordoñez
#### Data: 6/12/2024

Você deve primeiramente modelar a estrutura descrita anteriormente em um banco de dados de sua preferência, após isso criar um componente capaz de ler os dados das tabelas criadas e escrever e um único arquivo flat, em um diretório local, ou seja, todas as tabelas devem ser agregadas para formar uma visão única, onde a estrutura do arquivo é representada da seguinte forma:

![Estrutura final](/files/images/estrutura_final_dbc.png)


## Resolução

### 1. Criação do banco de dados
Para resolver o desafio criado pela DBC Company para a vaga de engenheiro de dados pleno foi desenvolvido um banco de dados na nuvem Azure. O serviço utilizado foi o **Azure Database for PostgreSQL** em modo de desenvolvimento. O banco de dados foi configurado na conta pessoal do desenvolvedor. Uma vez o banco de dados foi criado no ambiente nuvem foi necessário se conectar ao mesmo utilizando a ferramenta **DBeaver**. 

Ao se conectar utilizando a ferramenta foi possível executar os códigos SQL para a criação da estrutura mostrada no **README** deste projeto, os códigos estão disponíveis na pasta SQL. Uma vez as tabelas são criadas é possível passar para a inserção de dados fictícios. Para a criação dos dados fictícios foi utilizado o ChatGPT, onde foi específicado a estrutura de cada uma das tabelas e foi solicitado que cria-se 50 associados, 50 contas, 50 cartoões e 200 movimentações (cada associado com 4 transações). Feito isso a estrutura estava pronta para inciar o processo de ETL.

### 2. Processo de ETL 

Para realizar o processo de ETL foi escolhida a ferramenta **Databricks** na sua versão de graça, conhecida como **Databricks Community** em conjunto com a ferramenta de PySpark.

Tendo em vista que seria necessário ter funções que seriam utilizadas várias vezes foi criada uma classe Utils para facilitar a reutilização de código. Nessa classe é possível encontrar códigos como a escrita de arquivos CSVs, tabelas, execução de querys para o banco de dados, entre outras.

Como primeiro passo é necessário obter o IP do Databricks Community para poder liberar o acesso no banco de dados com o fim de evitar acessos não desejados. Obtendo o endereço IP é realizado a liberação de acesso para leitura dos dados do banco de dados. 

#### 2.1 Leitura de dados e criação de camada bronze
O primeiro passo é conectar-se ao banco de dados utilizando o JDBC. Na classe Utils foram criados os dados de acesso para não ter eles expostos no código, nela é possível encontrar a URL de acesso, usuário e senha. Para isso utilizamos a função chamada `sql_query_executor` passando por parâmetro a URL, nome da tabela e propriedades de login. 

Uma vez que os dados são lidos do banco de dados é possível criar a camada **bronze**, para isso os arquivos foram separados em pastas por tabelas onde o Spark irá criar o arquivo csv. Ao mesmo tempo os dados foram salvos dentro do Catalog `hive_metastore` no Schema `default`, tendo como prefixo o nome da camada específicada, por exemplo `bronze_associados`.

#### 2.2 Transformação de dados
Após a leitura dos dados da camada bronze é necessário realizar as transformações para padronizar os dados. As transformações aqui aplicadas foram simples como por exemplo:
* Colunas do tipo `string` foram convertidas em maiúsculas;
* Especificando o tipo da coluna para garantir qualidade nos dados.

Após transformar os dados e salvar eles em variáveis novas, as mesmas seriam salvas na pasta específicada para a camada **silver**. Foram utilizadas as funções `write_csv` e `save_as_table` para atingir o resultado desejado.

#### 2.3 Agrupamento de tabelas
Como foi descrito na introdução do desafio seria necessário chegar na estrutura específicada pelos avaliadores, chamada de **movimento_flat**. Para atingir o objetivo mencionado anteriormente é necessário unir as tabelas com o fim de obter uma grande tabela com todos os dados esperados. 

1. Primeiramente as tabelas de **contas** e **associados** forma unidas utilizando a coluna `associado_id` utilizando o inner join. Ao mesmo tempo de fazer a união de ambos os DataFrames foram dados alias para as colunas para já ter os nomes esperados na estrutura final. A DataFrame resultante foi chamado de **associado_conta_df**.

2. Após isso a união das tabelas **cartoes** e **movimentacoes** foram unidas utilizando a coluna `cartao_id` utilizando o inner join. Assim como mencionado no passo anterior, alias foram dados para as colunas. A DataFrame resultante foi chamado de **movimento_cartao_df**.

3. Finalmente a união das tabelas **associado_conta_df** e **movimento_cartao_df** foi realizado para obter o resultado final, o qual foi salvo na variável `movimento_flat_df`. Para realizar a união destes DataFrames foram utilizadas as chaves `associado_id` e `conta_id` para garantir o resultado esperado. 

4. Após obter a estrutura desejada as colunas foram reordenadas para obter a ordem específicada na imagem apresentada na introdução da documentação.

5. A estrutura desejada foi salva em arquivo CSV e como tabela com o prefixo de silver já que os dados estão de forma analítica. 

#### 2.4 Sumarização tabela Movimento Flat
Com o intuíto de se ter os dados prontos para consumo pelos analistas de dados foi criada uma tabela com os dados sumarizados a partir da tabela **movimento_flat**. Os dados foram agrupados pelas seguintes colunas: 
* `nome_associado`
* `sobrenome_associado`
* `idade_associado`
* `numero_cartao`
* `tipo_conta`

Foram criadas 5 novas colunas com os dados agregados para cada um dos associados. As colunas criadas foram:
* `total_gastos` -> Soma de todos os gastos realizados
* `num_transacoes` -> Quantidade de transações realizadas
* `gasto_medio` -> Média dos gastos realizados
* `gasto_maximo` -> Gasto máximo realizado
* `gasto_minimo` -> Gasto minimo realiado


Com essas colunas é possível criar um dashboard para poder realizar uma análise mais aprofundada e entender o comportamento dos clientes fictícios da base de dados. 

Esta tabela foi chamada de **resumo_gastos**, ela se encontra na camada **Gold** já que ela contém os dados sumarizados e prontos para consumo por analistas de dados.

In [0]:
%run ./Utils

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, DecimalType, TimestampType

In [0]:
# Obtendo o IP do databricks para inserir nas permissões do banco de dados
databricks_ip = Utils.get_databricks_ip()
print(databricks_ip)

54.244.74.60


In [0]:
Utils.delete_layers_content()

In [0]:
url = Utils.get_jdbc_url()  # Obtendo a URL para conectar no banco de dados
connection_properties = Utils.get_connection_properties()  # Obtendo as credenciais do banco de dados

# Lendo cada uma das tabelas do banco de dados
associados_df = Utils.sql_query_executor(url, 'associados', connection_properties)
contas_df = Utils.sql_query_executor(url, 'contas', connection_properties)
cartoes_df = Utils.sql_query_executor(url, 'cartoes', connection_properties)
movimentacoes_df = Utils.sql_query_executor(url, 'movimentacoes', connection_properties)


Criando a camada bronze dos dados com arquivos CSV e tabelas no Unity Catalog

In [0]:
# Salvando cada um dos DataFrames em arquivos CSV
Utils.write_csv(associados_df, 'bronze', 'associados')
Utils.write_csv(contas_df, 'bronze', 'contas')
Utils.write_csv(cartoes_df, 'bronze', 'cartoes')
Utils.write_csv(movimentacoes_df, 'bronze', 'movimentacoes')

Arquivo 'associados' salvo com sucesso.
Arquivo 'contas' salvo com sucesso.
Arquivo 'cartoes' salvo com sucesso.
Arquivo 'movimentacoes' salvo com sucesso.


In [0]:
# Criando tabelas na camada bronze
Utils.save_as_table(associados_df, 'bronze', 'associados')
Utils.save_as_table(contas_df, 'bronze', 'contas')
Utils.save_as_table(cartoes_df, 'bronze', 'cartoes')
Utils.save_as_table(movimentacoes_df, 'bronze', 'movimentacoes')

Tabela 'bronze_associados' salva com sucesso.
Tabela 'bronze_contas' salva com sucesso.
Tabela 'bronze_cartoes' salva com sucesso.
Tabela 'bronze_movimentacoes' salva com sucesso.


Aplçicando transformações simples para ter a camada Silver dos dados.

Transformações aplicadas:
* Colunas do tipo **string** em maiúsculo
* Especificando o tipo da coluna.

In [0]:
# Transformando DataFrame de associados
associados_silver_df = associados_df.select(
    col('associado_id').cast(IntegerType()),
    f.upper(col('nome')).cast(StringType()).alias('nome'),
    f.upper(col('sobrenome')).cast(StringType()).alias('sobrenome'),
    col('idade').cast(IntegerType()),
    f.upper(col('email')).cast(StringType()).alias('email')
)

# Transformando DataFrame de Contas
contas_silver_df = contas_df.select(
    col('conta_id').cast(IntegerType()),
    f.upper(col('tipo')).cast(StringType()).alias('tipo'),
    col('data_criacao').cast(TimestampType()),
    col('associado_id').cast(IntegerType())
)

# Transformando DataFrame de Cartões
cartoes_silver_df = cartoes_df.select(
    col('cartao_id').cast(IntegerType()),
    col('num_cartao').cast(StringType()),
    f.upper(col('nom_impresso')).cast(StringType()).alias('nom_impresso'),
    col('data_criacao').cast(TimestampType()),
    col('conta_id').cast(IntegerType()),
    col('associado_id').cast(IntegerType())
)

# Transformando DataFrame de Movimentações
movimentacoes_silver_df = movimentacoes_df.select(
    col('movimentacao_id').cast(IntegerType()),
    col('vlr_transacao').cast(DecimalType()),
    f.upper(col('des_transacao')).alias('des_transacao'),
    col('data_movimento').cast(TimestampType()),
    col('cartao_id').cast(IntegerType())
)

In [0]:
# Salvando DataFrames na camada silver
Utils.write_csv(associados_silver_df, 'silver', 'associados')
Utils.write_csv(contas_silver_df, 'silver', 'contas')
Utils.write_csv(cartoes_silver_df, 'silver', 'cartoes')
Utils.write_csv(movimentacoes_silver_df, 'silver', 'movimentacoes')

Arquivo 'associados' salvo com sucesso.
Arquivo 'contas' salvo com sucesso.
Arquivo 'cartoes' salvo com sucesso.
Arquivo 'movimentacoes' salvo com sucesso.


In [0]:
# Salvando os DataFrames como tabelas na camada silver
Utils.save_as_table(associados_silver_df, 'silver', 'associados')
Utils.save_as_table(contas_silver_df, 'silver', 'contas')
Utils.save_as_table(cartoes_silver_df, 'silver', 'cartoes')
Utils.save_as_table(movimentacoes_silver_df, 'silver', 'movimentacoes')

Tabela 'silver_associados' salva com sucesso.
Tabela 'silver_contas' salva com sucesso.
Tabela 'silver_cartoes' salva com sucesso.
Tabela 'silver_movimentacoes' salva com sucesso.


In [0]:
# Unindo a tabela de Associados e Contas
associado_conta_df = associados_silver_df.join(contas_silver_df, on='associado_id').select(  # Unindo pela chave associado_id
    associados_silver_df.nome.alias('nome_associado'),
    associados_silver_df.sobrenome.alias('sobrenome_associado'),
    associados_silver_df.idade.alias('idade_associado'),
    contas_silver_df.tipo.alias('tipo_conta'),
    contas_silver_df.data_criacao.alias('data_criacao_conta'),
    associados_silver_df.associado_id,
    contas_silver_df.conta_id
)

# Unindo a tabela de Cartões com Movimentações
movimento_cartao_df = cartoes_silver_df.join(movimentacoes_silver_df, on='cartao_id').select(  # Unindo pela chave cartao_id
    movimentacoes_silver_df.vlr_transacao.alias('vlr_transacao_movimento'),
    movimentacoes_silver_df.des_transacao.alias('des_transacao_movimento'),
    movimentacoes_silver_df.data_movimento,
    cartoes_silver_df.num_cartao.alias('numero_cartao'),
    cartoes_silver_df.nom_impresso.alias('nome_impresso_cartao'),
    cartoes_silver_df.data_criacao.alias('data_criacao_cartao'),
    cartoes_silver_df.conta_id,
    cartoes_silver_df.associado_id
)

# Unindo as duas tabelas resultantes em uma só para obter o resultado final
movimento_flat_df = associado_conta_df.join(movimento_cartao_df, on=['associado_id', 'conta_id']).drop(  # Unindo pelas chaves associado_id e conta_id
    associado_conta_df.associado_id,
    associado_conta_df.conta_id
)

# Especificando a ordem das colunas
column_order = [
    'nome_associado',
    'sobrenome_associado',
    'idade_associado',
    'vlr_transacao_movimento',
    'des_transacao_movimento',
    'data_movimento',
    'numero_cartao',
    'nome_impresso_cartao',
    'data_criacao_cartao',
    'tipo_conta',
    'data_criacao_conta'
]

# Aplicando a ordem desejada
movimento_flat_df = movimento_flat_df.select(column_order)

# Mostrando resultado
display(movimento_flat_df)


nome_associado,sobrenome_associado,idade_associado,vlr_transacao_movimento,des_transacao_movimento,data_movimento,numero_cartao,nome_impresso_cartao,data_criacao_cartao,tipo_conta,data_criacao_conta
DIEGO,BARROS,37,175,COMPRA SUPERMERCADO,2024-02-01T10:30:00Z,1122334455667789,DIEGO BARROS,2024-01-01T16:45:00Z,CORRENTE,2024-06-01T14:30:00Z
DIEGO,BARROS,37,150,COMPRA SUPERMERCADO,2024-04-01T10:30:00Z,1122334455667789,DIEGO BARROS,2024-01-01T16:45:00Z,CORRENTE,2024-06-01T14:30:00Z
DIEGO,BARROS,37,180,COMPRA SUPERMERCADO,2024-06-01T10:30:00Z,1122334455667789,DIEGO BARROS,2024-01-01T16:45:00Z,CORRENTE,2024-06-01T14:30:00Z
DIEGO,BARROS,37,150,COMPRA SUPERMERCADO,2024-08-01T10:30:00Z,1122334455667789,DIEGO BARROS,2024-01-01T16:45:00Z,CORRENTE,2024-06-01T14:30:00Z
ROBERTA,ANTUNES,28,50,ASSINATURA MENSAL,2024-02-04T14:10:00Z,4455667788990012,ROBERTA ANTUNES,2024-01-04T14:00:00Z,POUPANÇA,2023-12-01T10:40:00Z
ROBERTA,ANTUNES,28,75,ASSINATURA MENSAL,2024-04-04T14:10:00Z,4455667788990012,ROBERTA ANTUNES,2024-01-04T14:00:00Z,POUPANÇA,2023-12-01T10:40:00Z
ROBERTA,ANTUNES,28,120,ASSINATURA MENSAL,2024-06-04T14:10:00Z,4455667788990012,ROBERTA ANTUNES,2024-01-04T14:00:00Z,POUPANÇA,2023-12-01T10:40:00Z
ROBERTA,ANTUNES,28,120,ASSINATURA MENSAL,2024-08-04T14:10:00Z,4455667788990012,ROBERTA ANTUNES,2024-01-04T14:00:00Z,POUPANÇA,2023-12-01T10:40:00Z
JÚLIA,FONSECA,31,150,JANTAR EM RESTAURANTE,2024-01-28T12:15:00Z,8899001122334455,JÚLIA FONSECA,2023-12-28T12:15:00Z,POUPANÇA,2023-09-10T16:45:00Z
JÚLIA,FONSECA,31,500,JANTAR EM RESTAURANTE,2024-03-28T12:15:00Z,8899001122334455,JÚLIA FONSECA,2023-12-28T12:15:00Z,POUPANÇA,2023-09-10T16:45:00Z


In [0]:
# Salvando DataFrame na camada Silver
Utils.write_csv(movimento_flat_df, 'silver', 'movimento_flat')

Arquivo 'movimento_flat' salvo com sucesso.


In [0]:
# Criando tabela na camada silver
Utils.save_as_table(movimento_flat_df, 'silver', 'movimento_flat')

Tabela 'silver_movimento_flat' salva com sucesso.


In [0]:
# Agrupar e sumarizar os gastos por pessoa e cartão
resumo_gastos_df = movimento_flat_df.groupBy(
    'nome_associado',
    'sobrenome_associado',
    'idade_associado',
    'numero_cartao',
    'tipo_conta'
).agg(  # Realizando diferentes agregações para obter um sumarizado dos dados
    f.sum(col('vlr_transacao_movimento')).alias('total_gastos'),
    f.count('*').alias('num_transacoes'),
    f.avg(col('vlr_transacao_movimento')).alias('gasto_medio'),
    f.max(col('vlr_transacao_movimento')).alias('gasto_maximo'),
    f.min(col('vlr_transacao_movimento')).alias('gasto_minimo')
)

# Mostrando resultado
display(resumo_gastos_df)


nome_associado,sobrenome_associado,idade_associado,numero_cartao,tipo_conta,total_gastos,num_transacoes,gasto_medio,gasto_maximo,gasto_minimo
ALINE,RIBEIRO,29,4444555566667777,POUPANÇA,190,4,47.5,50,40
NATÁLIA,DIAS,26,4455667788990011,POUPANÇA,385,4,96.25,120,75
LUCAS,MARTINS,33,1234123412341234,CORRENTE,875,4,218.75,300,175
SÉRGIO,MORAIS,45,5566778899001123,CORRENTE,870,4,217.5,230,200
AUGUSTO,ANDRADE,37,1122334455667781,CORRENTE,1350,4,337.5,350,300
LARISSA,MELO,24,2345234523451234,POUPANÇA,356,4,89.0,120,76
GUILHERME,TEIXEIRA,28,1122334455667788,CORRENTE,621,4,155.25,220,100
JOSÉ,MOREIRA,30,5566778899001124,CORRENTE,770,4,192.5,230,100
FELIPE,ARAÚJO,42,7890789078905678,CORRENTE,781,4,195.25,320,60
MANUELA,AGUIAR,28,8899001122334457,POUPANÇA,1550,4,387.5,500,350


In [0]:
# Salvando DataFrame na camada gold
Utils.write_csv(resumo_gastos_df, 'gold', 'resumo_gastos')

Arquivo 'resumo_gastos' salvo com sucesso.


In [0]:
# Criando tabela na camada gold
Utils.save_as_table(resumo_gastos_df, 'gold', 'resumo_gastos')

Tabela 'gold_resumo_gastos' salva com sucesso.
