###Libs

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

###Configurações de conexão

In [0]:
host = "psql-mock-database-cloud.postgres.database.azure.com"
database = "ecom1692155331663giqokzaqmuqlogbu"
port = "5432"
username = "eolowynayhvayxbhluzaqxfp@psql-mock-database-cloud"
password = "hdzvzutlssuozdonhflhwyjm"

# URL
url = f"jdbc:postgresql://{host}:{port}/{database}"

In [0]:
# Inicializando a sessão Spark
spark = SparkSession.builder \
    .appName("PostgreSQL Connection") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.20") \
    .getOrCreate()

###Acessando ao Banco

In [0]:
tables_df = spark.read.format("jdbc") \
    .option("url", url) \
    .option("dbtable", "(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') as tables") \
    .option("user", username) \
    .option("password", password) \
    .load()

# Exibindo as tabelas
tables_df.show()

+------------------+
|        table_name|
+------------------+
|         customers|
|         employees|
|           offices|
|      orderdetails|
|            orders|
|          payments|
|     product_lines|
|          products|
|pg_stat_statements|
|    pg_buffercache|
+------------------+



###Realizando discovery da tabela Customers

In [0]:
# Lendo dados do banco de dados
customers_jdbc = spark.read.format("jdbc") \
    .option("url", url) \
    .option("dbtable", "customers") \
    .option("user", username) \
    .option("password", password) \
    .load()

In [0]:
# Exibindo os dados
display(customers_jdbc)

customer_number,customer_name,contact_last_name,contact_first_name,phone,address_line1,address_line2,city,state,postal_code,country,sales_rep_employee_number,credit_limit
103,Jake,King,Carine,40.32.2555,"54, rue Royale",,Nantes,Victoria,44000,France,1370.0,21000.0
112,Signal Gift Store,King,Jean,7025551838,8489 Strong St.,,Las Vegas,New York,83030,USA,1166.0,71800.0
114,"Australian Collectors, Co.",Ferguson,Peter Sr.,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611.0,117300.0
119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370.0,118200.0
121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504.0,81700.0
124,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,,San Rafael,CA,97562,USA,1165.0,210500.0
125,Havel & Zbyszek Co,Piestrzeniewicz,Zbyszek,(26) 642-7555,ul. Filtrowa 68,,Warszawa,,01-012,Poland,,0.0
128,"Blauer See Auto, Co.",Keitel,Roland,+49 69 66 90 2555,Lyonerstr. 34,,Frankfurt,,60528,Germany,1504.0,59700.0
129,Mini Wheels Co.,Murphy,Julie,6505555787,5557 North Pendale Street,,San Francisco,CA,94217,USA,1165.0,64600.0
131,Land of Toys Inc.,Lee,Kwai,2125557818,897 Long Airport Avenue,,NYC,NY,10022,USA,1323.0,114900.0


In [0]:
columns_info = customers_jdbc.schema

In [0]:
# Exibindo informações das colunas
for field in columns_info.fields:
    print(f"Nome da Coluna: {field.name}, Tipo: {field.dataType}")

Nome da Coluna: customer_number, Tipo: IntegerType()
Nome da Coluna: customer_name, Tipo: StringType()
Nome da Coluna: contact_last_name, Tipo: StringType()
Nome da Coluna: contact_first_name, Tipo: StringType()
Nome da Coluna: phone, Tipo: StringType()
Nome da Coluna: address_line1, Tipo: StringType()
Nome da Coluna: address_line2, Tipo: StringType()
Nome da Coluna: city, Tipo: StringType()
Nome da Coluna: state, Tipo: StringType()
Nome da Coluna: postal_code, Tipo: StringType()
Nome da Coluna: country, Tipo: StringType()
Nome da Coluna: sales_rep_employee_number, Tipo: IntegerType()
Nome da Coluna: credit_limit, Tipo: DecimalType(10,2)


###Salvando as tabelas no formato de Parquet

In [0]:
table_names = ["customers", "employees", "orderdetails", "orders", "payments", "product_lines", "products", "offices"]

# Diretório onde os arquivos Parquet serão salvos
output_directory = "/dbfs/FileStore/tables/case_rnp"

# Loop para realizar a ingestão dos arquivos com o formart Parquet
for table_name in table_names:
    # Lendo os dados da tabela
    df = spark.read.format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .load()
    parquet_path = f"{output_directory}/{table_name}.parquet"
    df.write.parquet(parquet_path)


In [0]:
# Loop para realizar a leitura dos aquivos Parquet
arquivos = dbutils.fs.ls(output_directory)
parquet_files = [arquivo.path for arquivo in arquivos if arquivo.path.endswith(".parquet")]
for arquivo in parquet_files:
    print(arquivo)
    table_name = parquet_file.split("/")[-1].replace(".parquet", "")
    df = spark.read.parquet(parquet_file)

In [0]:
# Path dos arquivos salvos no DBFS
display(arquivos)

path,name,size,modificationTime
dbfs:/dbfs/FileStore/tables/case_rnp/customers.parquet/,customers.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/employees.parquet/,employees.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/offices.parquet/,offices.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/orderdetails.parquet/,orderdetails.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/orders.parquet/,orders.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/payments.parquet/,payments.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/product_lines.parquet/,product_lines.parquet/,0,0
dbfs:/dbfs/FileStore/tables/case_rnp/products.parquet/,products.parquet/,0,0


###Criando os DataFrames

In [0]:
customers = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/customers.parquet/")
employees = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/employees.parquet/")
offices = spark.read.parquet("dbfs:/dbfs/FileStore/tables/case_rnp/offices.parquet/")
orderdetails = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/orderdetails.parquet/")
orders = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/orders.parquet/")
payments = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/payments.parquet/")
product_lines = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/product_lines.parquet/")
products = spark.read.parquet("dbfs:/dbfs/FileStore/tables/RNP/TABLES_RNP/products.parquet/")

###Criando as tabelas no Schema RNP

In [0]:
# Lista de DataFrames
dataframes = {
    "customers": customers,
    "employees": employees,
    "offices": offices,
    "orderdetails": orderdetails,
    "orders": orders,
    "payments": payments,
    "product_lines": product_lines,
    "products": products
}

In [0]:
spark.conf.set("spark.sql.session.timeZone", 'America/Sao_Paulo')

In [0]:
# Criando Schema no Catalogo
spark.sql("CREATE SCHEMA IF NOT EXISTS rnp")
spark.sql("USE SCHEMA rnp")

# Loop para salvar cada DataFrame como tabela com a coluna de ingestão
for table_name, df in dataframes.items():
    # Adicionando coluna de ts_ingestao_brt
    df_with_timestamp = df.withColumn(
        "ts_ingestao_brt", 
        F.from_utc_timestamp(F.current_timestamp(), 'America/Sao_Paulo')
    ).withColumn("dt_ingestao_brt", F.current_date())
    
    # Adicionando o prefixo 'tb_'
    full_table_name = f"tb_{table_name}"
    
    # Definindo o caminho onde a tabela Delta será salva
    delta_table_path = f"/delta/rnp/{full_table_name}"
    
    # Salvando a tabela em formato Delta
    df_with_timestamp.write.format("delta").mode("overwrite").save(delta_table_path)

    # Tabela criada com sucesso
    print(f"Tabela Delta {full_table_name} criada com sucesso.")


Tabela Delta tb_customers criada com sucesso.
Tabela Delta tb_employees criada com sucesso.
Tabela Delta tb_offices criada com sucesso.
Tabela Delta tb_orderdetails criada com sucesso.
Tabela Delta tb_orders criada com sucesso.
Tabela Delta tb_payments criada com sucesso.
Tabela Delta tb_product_lines criada com sucesso.
Tabela Delta tb_products criada com sucesso.


#4. Criar as querys ou código utilizando a linguagem de sua preferência que respondam as seguintes perguntas:

###Qual país possui a maior quantidade de itens cancelados?

In [0]:
# Inicializa a Spark session
spark = SparkSession.builder.appName("DeltaTableExample").getOrCreate()

# Tabelas
order_details_df = spark.table("rnp.tb_orderdetails")
orders_df = spark.table("rnp.tb_orders")
customers_df = spark.table("rnp.tb_customers")

In [0]:
# Join Tabelas
tb_canceled_items_per_country = (
    order_details_df
    .join(orders_df, order_details_df.order_number == orders_df.order_number)
    .join(customers_df, orders_df.customer_number == customers_df.customer_number)
    .where(orders_df.status == 'Cancelled')
    .groupBy(customers_df.country)
    .agg(F.count(order_details_df.quantity_ordered).alias("canceled_items_count"))
    .orderBy(F.desc("canceled_items_count"))
)

display(vw_canceled_items_per_country)

country,canceled_items_count
New Zealand,19
Sweden,16
Spain,16
USA,14
UK,14


In [0]:
# Definindo o caminho da tabela Delta no schema rnp
delta_table_path = "/delta/rnp/tb_canceled_items_per_country"

# Adicionando as colunas de ingestão
tb_canceled_items_per_country_with_timestamp = tb_canceled_items_per_country.withColumn(
    "ts_ingestao_brt", 
    F.from_utc_timestamp(F.current_timestamp(), 'America/Sao_Paulo')
).withColumn("dt_ingestao_brt", F.current_date())

# Salvando o resultado como uma tabela Delta no esquema rnp
tb_canceled_items_per_country_with_timestamp.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_table_path)

# Registrando a tabela no catálogo do Spark
spark.sql(f"CREATE TABLE IF NOT EXISTS rnp.tb_canceled_items_per_country USING DELTA LOCATION '{delta_table_path}'")

print("Tabela Delta 'tb_canceled_items_per_country' criada com sucesso.")


Tabela Delta 'tb_canceled_items_per_country' criada com sucesso.


###Resultado utilizando SQL

In [0]:
%sql

select * from rnp.tb_canceled_items_per_country limit 1

country,canceled_items_count,ts_ingestao_brt,dt_ingestao_brt
New Zealand,19,2024-09-27T14:59:16.714-0300,2024-09-27


###Qual o faturamento da linha de produto mais vendido, considere os pedidos com status 'Shipped', cujo o pedido foi realizado no ano de 2005?

In [0]:
# Tabelas
order_details_df = spark.table("rnp.tb_orderdetails")
orders_df = spark.table("rnp.tb_orders")
products_df = spark.table("rnp.tb_products")
product_lines_df = spark.table("rnp.tb_product_lines")

In [0]:
# Join Tabelas
tb_total_revenue_per_product_line = (
    order_details_df
    .join(orders_df, order_details_df.order_number == orders_df.order_number)
    .join(products_df, order_details_df.product_code == products_df.product_code)
    .join(product_lines_df, products_df.product_line == product_lines_df.product_line)
    .where((orders_df.status == 'Shipped') & (F.year(orders_df.order_date) == 2005))
    .groupBy(product_lines_df.product_line)
    .agg(F.round(F.sum(order_details_df.quantity_ordered * order_details_df.price_each), 2).alias("total_revenue"))
    .orderBy(F.desc("total_revenue"))
)

display(tb_total_revenue_per_product_line)

product_line,total_revenue
Classic Cars,603666.99
Vintage Cars,222510.7
Motorcycles,212684.55
Trucks and Buses,182231.45
Planes,109701.56
Ships,62989.19
Trains,22311.26


In [0]:
# Definindo o caminho da tabela Delta no schema rnp
delta_table_path = "/delta/rnp/tb_total_revenue_per_product_line"

# Adicionando as colunas de ingestão
tb_total_revenue_per_product_line_with_timestamp = tb_total_revenue_per_product_line.withColumn(
    "ts_ingestao_brt", 
    F.from_utc_timestamp(F.current_timestamp(), 'America/Sao_Paulo')
).withColumn("dt_ingestao_brt", F.current_date())

# Salvando o resultado como uma tabela Delta no esquema rnp
tb_total_revenue_per_product_line_with_timestamp.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_table_path)

# Registrando a tabela no catálogo do Spark
spark.sql(f"CREATE TABLE IF NOT EXISTS rnp.tb_total_revenue_per_product_line USING DELTA LOCATION '{delta_table_path}'")

print("Tabela Delta 'tb_total_revenue_per_product_line' criada com sucesso.")


Tabela Delta 'tb_total_revenue_per_product_line' criada com sucesso.


###Resultado utilizando SQL

In [0]:
%sql

select product_line,total_revenue from rnp.tb_total_revenue_per_product_line limit 1

product_line,total_revenue
Classic Cars,603666.99


###Traga na consulta o Nome, sobrenome e e-mail dos vendedores do Japão, lembrando que o local-part do e-mail deve estar mascarado.

In [0]:
# Tabelas
employees_df = spark.table("rnp.tb_employees")
offices_df = spark.table("rnp.tb_offices")

In [0]:
# Join Tabelas
tb_employees_japan = (
    employees_df
    .join(offices_df, employees_df.office_code == offices_df.office_code, "left")
    .where(offices_df.country == "Japan")
    .select(
        F.concat(employees_df.first_name, F.lit(" "), employees_df.last_name).alias("employee_name"),
        # Usando F.expr para mascarar o e-mail
        F.expr("concat(substring(email, 1, 2), '****', substring(email, instr(email, '@'), length(email)))").alias("email")
    )
)

display(tb_employees_japan)

employee_name,email
Mami Nishi,mn****@classicmodelcars.com
Yoshimi Kato,yk****@classicmodelcars.com


In [0]:
# Definindo o caminho da tabela Delta no schema rnp
delta_table_path = "/delta/rnp/tb_employees_japan"

# Adicionando as colunas de ingestão
tb_employees_japan_with_timestamp = tb_employees_japan.withColumn(
    "ts_ingestao_brt", 
    F.from_utc_timestamp(F.current_timestamp(), 'America/Sao_Paulo')
).withColumn("dt_ingestao_brt", F.current_date())

# Salvando o resultado como uma tabela Delta no esquema rnp
tb_employees_japan_with_timestamp.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_table_path)

# Registrando a tabela no catálogo do Spark
spark.sql(f"CREATE TABLE IF NOT EXISTS rnp.tb_employees_japan USING DELTA LOCATION '{delta_table_path}'")

print("Tabela Delta 'tb_employees_japan' criada com sucesso.")

Tabela Delta 'tb_employees_japan' criada com sucesso.


###Resultado utilizando SQL

In [0]:
%sql

select * from rnp.tb_employees_japan

employee_name,email,ts_ingestao_brt,dt_ingestao_brt
Mami Nishi,mn****@classicmodelcars.com,2024-09-27T15:29:02.165-0300,2024-09-27
Yoshimi Kato,yk****@classicmodelcars.com,2024-09-27T15:29:02.165-0300,2024-09-27
