# Entidade NotebookCaseTecnicoJoin

## Overview

 | Detail Tag | Information |
 |------------|-------------|
 |Originally Created By |  Ronnan Conceição Lima |
 |Output Datasets | arquivos parquet/delta </li></ul>|
 |Input Data Source | psql-database |
 |Output Data Source | Delta Files, Parquet Files |


## Histórico de atualização
 | Date | Developed By | Reason |
 |:----:|--------------|--------|
 |04/10/2024 | Ronnan Conceição Lima | Criação de Notebook, funções e Variaveis para gravação e resolução do case técnico. |



## Other Details

- Case Prático
- Utilize o Database Type Ecommerce disponível em:
- Host: psql-mock-database-cloud.postgres.database.azure.com
- Banco de Dados: ecom1692155331663giqokzaqmuqlogbu
- Port: 5432
- Username: eolowynayhvayxbhluzaqxfp@psql-mock-database-cloud
- Password: hdzvzutlssuozdonhflhwyjm

1. Desenhe o diagrama de ER do banco. Imagem, DBML ou diagrama são aceitáveis.

2. Crie uma conta no Databricks Community 
(https://community.cloud.databricks.com)

3. Realizar a ingestão das tabelas do Database Type Ecommerce no formato .parquet (1 arquivo por tabela) usando PySpark

4. Criar as querys ou código utilizando a linguagem de sua preferência que respondam às seguintes perguntas:
a.	Qual país possui a maior quantidade de itens cancelados?
b.	Qual o faturamento da linha de produto mais vendido, considere os pedidos com status 'Shipped', cujo pedido foi realizado no ano de 2005?
c.	Traga na consulta o Nome, sobrenome e e-mail dos vendedores do Japão, lembrando que o local-part do e-mail deve estar mascarado.

5. Salve os resultados em formato delta.

6. Crie um README.md documentando a solução e o processo usado para chegar no resultado.

7. Disponibilize os artefatos (Notebooks, README.md, Diagrama de ER) num repositório público no GitHub para consulta.

Perguntas Teóricas
1. Como você utiliza o Delta Lake no Azure Databricks para garantir a integridade dos dados?
2. Quais são as vantagens do uso do Spark em comparação com outras tecnologias de processamento de dados?
3. Descreva um caso em que você precisou sincronizar dados entre diferentes sistemas.
4. Desenhe uma arquitetura de dados comentada para uma empresa que utiliza Azure e Databricks, incluindo armazenamento, processamento e análise.
5. Como você garante a escalabilidade e a robustez da arquitetura de dados?
6. Como você implementa a criptografia de dados em repouso e em trânsito?
7. Como você gerencia a qualidade dos dados em um pipeline de dados?
8. Qual a importância do FinOps para a engenharia de dados?
9. Como o DevOps ajuda o engenheiro de dados?
10. Como iniciamos um projeto de pipeline de dados?
11. Como realizar CI/CD em um pipeline de dados?
12. Quais ferramentas de orquestração você já trabalhou?
13. Quais suas motivações para ser um engenheiro de dados?

In [0]:
from delta.tables import DeltaTable

In [0]:
# Parâmetros de conexão
url = "jdbc:postgresql://psql-mock-database-cloud.postgres.database.azure.com:5432/ecom1692155331663giqokzaqmuqlogbu"
properties = {
    "user": "eolowynayhvayxbhluzaqxfp@psql-mock-database-cloud",
    "password": "hdzvzutlssuozdonhflhwyjm",
    "driver": "org.postgresql.Driver"
}

# Função auxiliar para leitura de dados do banco
def ler_do_banco(query):
    return spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("query", query) \
        .option("user", properties["user"]) \
        .option("password", properties["password"]) \
        .option("driver", properties["driver"]) \
        .load()

# Função para listar esquemas
def listar_esquemas():
    query = "SELECT schema_name FROM information_schema.schemata"
    df_esquemas = ler_do_banco(query)
    esquemas = df_esquemas.select("schema_name").collect()
    print("Esquemas disponíveis:")
    for esquema in esquemas:
        print(esquema.schema_name)

# Função para listar tabelas em um esquema específico
def listar_tabelas(esquema):
    query = f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{esquema}'"
    df_tabelas = ler_do_banco(query)
    tabelas = df_tabelas.select("table_name").collect()
    print(f"Tabelas no esquema '{esquema}':")
    for tabela in tabelas:
        print(tabela.table_name)

In [0]:
listar_esquemas()
listar_tabelas("public")
schema_teste = 'public'
tabela_teste = 'customers'
df_dados = ler_do_banco(f'''SELECT * FROM {schema_teste}.{tabela_teste} limit 1 ''')
df_dados.display()

Esquemas disponíveis:
pg_catalog
public
information_schema
Tabelas no esquema 'public':
customers
employees
offices
orderdetails
orders
payments
product_lines
products
pg_stat_statements
pg_buffercache


customer_number,customer_name,contact_last_name,contact_first_name,phone,address_line1,address_line2,city,state,postal_code,country,sales_rep_employee_number,credit_limit
103,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370,21000.0


In [0]:
tabelas = ["customers", "employees", "offices", "orderdetails", "orders", "payments", "product_lines", "products"]

In [0]:
for tabela in tabelas:
    try:
        df = ler_do_banco(f'''SELECT * FROM public.{tabela}''')
        df.write.mode("overwrite").parquet(f"/dbfs/FileStore/tabelas_parquet/{tabela}.parquet")
    except Exception as e:
        if 'already exists' in str(e):
            print(f"Path for {tabela} already exists. Overwriting...")
            df.write.mode("overwrite").parquet(f"/dbfs/FileStore/tabelas_parquet/{tabela}.parquet")
        else:
            print(f"Erro ao salvar a tabela {tabela}: {e}")

In [0]:

# Pergunta A: Qual país possui a maior quantidade de itens cancelados?
query_pais_maior_cancelamento = """
    SELECT c.country, COUNT(od.order_number) AS total_cancelled
    FROM orders o
    JOIN orderdetails od ON o.order_number = od.order_number
    JOIN customers c ON o.customer_number = c.customer_number
    WHERE o.status = 'Cancelled'
    GROUP BY c.country
    ORDER BY total_cancelled DESC
    LIMIT 1
"""

In [0]:
df_pais_maior_cancelamento = ler_do_banco(query_pais_maior_cancelamento)
print(" Gravando os dados da tabela pais_maior_cancelamento ")

df_pais_maior_cancelamento.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/dbfs/FileStore/resultados_delta/pais_maior_cancelamento.delta")

df_pais_maior_cancelamento.display()

 Gravando os dados da tabela pais_maior_cancelamento 


country,total_cancelled
New Zealand,19


In [0]:
# Pergunta B: Qual o faturamento da linha de produto mais vendido (status 'Shipped' em 2005)?
query_faturamento_produto_mais_vendido_2005 = """
    SELECT pl.product_line, CAST(SUM(od.price_each * od.quantity_ordered) AS FLOAT) AS faturamento
    FROM orders o
    JOIN orderdetails od ON o.order_number = od.order_number
    JOIN products p ON p.product_code = od.product_code
    JOIN product_lines pl ON pl.product_line = p.product_line
    WHERE o.status = 'Shipped' AND EXTRACT(YEAR FROM o.order_date) = 2005
    GROUP BY pl.product_line
    ORDER BY faturamento DESC
    LIMIT 1
"""

In [0]:
df_faturamento_produto_mais_vendido_2005 = ler_do_banco(query_faturamento_produto_mais_vendido_2005)
print(" Gravando os dados da tabela faturamento_produto_mais_vendido_2005 ")

df_faturamento_produto_mais_vendido_2005.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/dbfs/FileStore/resultados_delta/faturamento_produto_mais_vendido_2005.delta")

df_faturamento_produto_mais_vendido_2005.display()

 Gravando os dados da tabela faturamento_produto_mais_vendido_2005 


product_line,faturamento
Classic Cars,603666.99


In [0]:
# Pergunta C: Nome, sobrenome e e-mail dos vendedores do Japão, com e-mail mascarado
query_vendedores_japao_mascarado = """
SELECT 
    CONCAT(e.first_name, ' ', e.last_name) AS nome_completo,
    CONCAT(SUBSTRING(e.email, 1, 3), '***', RIGHT(e.email, LENGTH(e.email) - POSITION('@' IN e.email) + 1)) AS email_mascarado
FROM 
    employees e
JOIN 
    offices o ON e.office_code = o.office_code
WHERE 
    o.country = 'Japan'
"""

In [0]:
df_vendedores_japao_mascarado = ler_do_banco(query_vendedores_japao_mascarado)
print(" Gravando os dados da tabela vendedores_japao_mascarado ")

df_vendedores_japao_mascarado.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/dbfs/FileStore/resultados_delta/vendedores_japao_mascarado.delta")

df_vendedores_japao_mascarado.display()

 Gravando os dados da tabela vendedores_japao_mascarado 


nome_completo,email_mascarado
Mami Nishi,mni***@classicmodelcars.com
Yoshimi Kato,yka***@classicmodelcars.com


In [0]:
# Otimizando os arquivos Delta
delta_a = DeltaTable.forPath(spark, "/dbfs/FileStore/resultados_delta/pais_maior_cancelamento.delta")
delta_a.optimize().executeCompaction()

delta_b = DeltaTable.forPath(spark, "/dbfs/FileStore/resultados_delta/faturamento_produto_mais_vendido_2005.delta")
delta_b.optimize().executeCompaction()

delta_c = DeltaTable.forPath(spark, "/dbfs/FileStore/resultados_delta/vendedores_japao_mascarado.delta")
delta_c.optimize().executeCompaction()

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,