##### Resetar todo o Notebook
dbutils.fs.rm("/letscode/passador/projeto/", True)  
dbutils.fs.rm("/user/hive/warehouse/letscode_passador.db", True)  
spark.sql("DROP DATABASE IF EXISTS letscode_passador CASCADE")

### Obtendo os Dados para Camada Raw

**Objetivo:** Ingestão dos dados das tabelas do NorthWind para a primera camada do Data Lake, o diretório `raw`.

In [0]:
#Criando o diretorio RAW
dbutils.fs.mkdirs("/letscode/passador/projeto/raw")

Out[2]: True

In [0]:
lista_raw = dbutils.fs.ls("/FileStore") #Adicionando os arquivos contidos em FileStore numa lista
lista_raw.remove(lista_raw[10]) # Removendo da lista o diretorio tables
lista_raw

Out[3]: [FileInfo(path='dbfs:/FileStore/categories.csv', name='categories.csv', size=427, modificationTime=1652625112000),
 FileInfo(path='dbfs:/FileStore/customers.csv', name='customers.csv', size=11564, modificationTime=1652625112000),
 FileInfo(path='dbfs:/FileStore/employee_territories.csv', name='employee_territories.csv', size=417, modificationTime=1652625112000),
 FileInfo(path='dbfs:/FileStore/employees.csv', name='employees.csv', size=4070, modificationTime=1652625112000),
 FileInfo(path='dbfs:/FileStore/order_details.csv', name='order_details.csv', size=40625, modificationTime=1652625113000),
 FileInfo(path='dbfs:/FileStore/orders.csv', name='orders.csv', size=98535, modificationTime=1652625113000),
 FileInfo(path='dbfs:/FileStore/products.csv', name='products.csv', size=4339, modificationTime=1652625113000),
 FileInfo(path='dbfs:/FileStore/region.csv', name='region.csv', size=76, modificationTime=1652625113000),
 FileInfo(path='dbfs:/FileStore/shippers.csv', name='shippers.c

In [0]:
# Movendo todos os arquivos do FileStore para a primeira camada do DataLake (RAW)
for n in range(len(lista_raw)):
    dbutils.fs.mv("/FileStore/" + lista_raw[n][1], "/letscode/passador/projeto/raw/", True)

In [0]:
# Criando lista de arquivos do RAW
lista_arq = (dbutils.fs.ls("/letscode/passador/projeto/raw/"))

In [0]:
# Validar ingestão
for n in range(len(lista_arq)):
    assert lista_arq[n][1] in [item.name for item in dbutils.fs.ls("/letscode/passador/projeto/raw")], "File not present in Raw Path"
    print(str(n) + ' - ' + lista_arq[n][1] + " - Assertion passed.")

0 - categories.csv - Assertion passed.
1 - customers.csv - Assertion passed.
2 - employee_territories.csv - Assertion passed.
3 - employees.csv - Assertion passed.
4 - order_details.csv - Assertion passed.
5 - orders.csv - Assertion passed.
6 - products.csv - Assertion passed.
7 - region.csv - Assertion passed.
8 - shippers.csv - Assertion passed.
9 - suppliers.csv - Assertion passed.
10 - territories.csv - Assertion passed.
11 - us_states.csv - Assertion passed.


In [0]:
display(dbutils.fs.ls("/letscode/passador/projeto/raw"))

path,name,size,modificationTime
dbfs:/letscode/passador/projeto/raw/categories.csv,categories.csv,427,1652625124000
dbfs:/letscode/passador/projeto/raw/customers.csv,customers.csv,11564,1652625125000
dbfs:/letscode/passador/projeto/raw/employee_territories.csv,employee_territories.csv,417,1652625125000
dbfs:/letscode/passador/projeto/raw/employees.csv,employees.csv,4070,1652625126000
dbfs:/letscode/passador/projeto/raw/order_details.csv,order_details.csv,40625,1652625127000
dbfs:/letscode/passador/projeto/raw/orders.csv,orders.csv,98535,1652625127000
dbfs:/letscode/passador/projeto/raw/products.csv,products.csv,4339,1652625128000
dbfs:/letscode/passador/projeto/raw/region.csv,region.csv,76,1652625129000
dbfs:/letscode/passador/projeto/raw/shippers.csv,shippers.csv,205,1652625129000
dbfs:/letscode/passador/projeto/raw/suppliers.csv,suppliers.csv,4013,1652625130000


#### Ajuste dos Dados da Camada Raw para a criação e inserção na camada Trusted

**Objective:** Correção dos dados das tabelas da camada 'raw' para o envio para a segunda camada do Data Lake, o diretório `trusted`.

In [0]:
lista_raw1 = dbutils.fs.ls("/letscode/passador/projeto/raw") #Adicionando os arquivos contidos em RAW numa lista
lista_raw1

Out[8]: [FileInfo(path='dbfs:/letscode/passador/projeto/raw/categories.csv', name='categories.csv', size=427, modificationTime=1652625124000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/customers.csv', name='customers.csv', size=11564, modificationTime=1652625125000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/employee_territories.csv', name='employee_territories.csv', size=417, modificationTime=1652625125000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/employees.csv', name='employees.csv', size=4070, modificationTime=1652625126000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/order_details.csv', name='order_details.csv', size=40625, modificationTime=1652625127000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/orders.csv', name='orders.csv', size=98535, modificationTime=1652625127000),
 FileInfo(path='dbfs:/letscode/passador/projeto/raw/products.csv', name='products.csv', size=4339, modificationTime=1652625128000),
 FileInfo(path='dbfs:/letscod

In [0]:
# Função para o carregamento do arquivo
def tabela(dataframe):
    file_path = "/letscode/passador/projeto/raw/" + dataframe
    dataframe = spark.read.csv(file_path , header=True, inferSchema=True)
    return dataframe

In [0]:
# Carregando todas as tabelas
df0 = tabela(lista_raw1[0][1]) #categories
df1 = tabela(lista_raw1[1][1]) #customers
df2 = tabela(lista_raw1[2][1]) #employee_territories
df3 = tabela(lista_raw1[3][1]) #employees
df4 = tabela(lista_raw1[4][1]) #order_details
df5 = tabela(lista_raw1[5][1]) #orders
df6 = tabela(lista_raw1[6][1]) #products
df7 = tabela(lista_raw1[7][1]) #region
df8 = tabela(lista_raw1[8][1]) #shippers
df9 = tabela(lista_raw1[9][1]) #suppliers
df10 = tabela(lista_raw1[10][1]) #territories
df11 = tabela(lista_raw1[11][1]) #us_states

In [0]:
# Carregando biblioteca Types
from pyspark.sql.types import StringType, DateType, FloatType

In [0]:
# Verificando a tabela categories
df0.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- picture: string (nullable = true)



In [0]:
# Verificando a tabela customers
df1.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)



In [0]:
# Verificando a tabela employee_territories
df2.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- territory_id: integer (nullable = true)



In [0]:
# Alterando as colunas para os tipos corretos 
df2 = df2.withColumn("territory_id", df2["territory_id"].cast('string'))

In [0]:
# Verificando a tabela employees
df3.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- title_of_courtesy: string (nullable = true)
 |-- birth_date: timestamp (nullable = true)
 |-- hire_date: timestamp (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- extension: integer (nullable = true)
 |-- photo: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- reports_to: integer (nullable = true)
 |-- photo_path: string (nullable = true)



In [0]:
# Alterando as colunas para os tipos corretos 
df3 = df3.withColumn("birth_date", df3["birth_date"].cast(DateType()))
df3 = df3.withColumn("hire_date", df3["hire_date"].cast(DateType()))
df3 = df3.withColumn("extension", df3["extension"].cast('string'))

In [0]:
# Verificando a tabela order_details
df4.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)



In [0]:
# Verificando a tabela orders
df5.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- required_date: timestamp (nullable = true)
 |-- shipped_date: timestamp (nullable = true)
 |-- ship_via: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- ship_name: string (nullable = true)
 |-- ship_address: string (nullable = true)
 |-- ship_city: string (nullable = true)
 |-- ship_region: string (nullable = true)
 |-- ship_postal_code: string (nullable = true)
 |-- ship_country: string (nullable = true)



In [0]:
# Alterando as colunas para os tipos corretos 
df5 = df5.withColumn("order_date", df5["order_date"].cast(DateType()))
df5 = df5.withColumn("required_date", df5["required_date"].cast(DateType()))
df5 = df5.withColumn("shipped_date", df5["shipped_date"].cast(DateType()))

In [0]:
# Verificando a tabela products
df6.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- quantity_per_unit: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- units_in_stock: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- reorder_level: integer (nullable = true)
 |-- discontinued: integer (nullable = true)



In [0]:
# Verificando a tabela region
df7.printSchema()

root
 |-- region_id: integer (nullable = true)
 |-- region_description: string (nullable = true)



In [0]:
df7 = df7.withColumn("region_id", df7["region_id"].cast('string'))

In [0]:
# Verificando a tabela shippers
df8.printSchema()

root
 |-- shipper_id: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- phone: string (nullable = true)



In [0]:
# Verificando a tabela suppliers
df9.printSchema()

root
 |-- supplier_id: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)
 |-- homepage: string (nullable = true)



In [0]:
# Verificando a tabela territories
df10.printSchema()

root
 |-- territory_id: integer (nullable = true)
 |-- territory_description: string (nullable = true)
 |-- region_id: integer (nullable = true)



In [0]:
df10 = df10.withColumn("territory_id", df10["territory_id"].cast('string'))
df10 = df10.withColumn("region_id", df10["region_id"].cast('string'))

In [0]:
# Verificando a us_states
df11.printSchema()

root
 |-- state_id: integer (nullable = true)
 |-- state_name: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- state_region: string (nullable = true)



### Camada Trusted

**Objective:** Inserção dos dados corrigidos das tabelas da camada 'raw' para a segunda camada do Data Lake, o diretório `trusted`.

In [0]:
# Função para salvar os arquivos no diretorio trusted
def w_parquet(dataframe, nome):
    (dataframe.write
     .mode("overwrite")
     .format("parquet")
     .save("/letscode/passador/projeto/trusted/" + nome))

In [0]:
import re #importando biblioteca para correção da string no for
lista_df = [df0, df1, df2, df3, df4, df5, df6, df7, df8, df9, df10, df11] # criando lista dos nomes dos dataframes para o for

In [0]:
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

In [0]:
# Salvando todas as tabelas na segunda camada do DataLake, diretório trusted separados pelo nome da tabela
for n in range(len(lista_raw1)):
    string = lista_raw1[n][1]
    string = re.sub(".csv","",string)
    w_parquet(lista_df[n], string)

In [0]:
#Configurando Banco de Dados
spark.sql("CREATE DATABASE IF NOT EXISTS letscode_passador")
spark.sql("USE letscode_passador")

Out[33]: DataFrame[]

#### Registrando as Tabelas Parquet no Metastore

for n in range(len(lista_raw1)):  
    string = lista_raw1[n][1]  
    string = re.sub(".csv","",string)  
    spark.sql(  
        f"""  
        DROP TABLE IF EXISTS {string}  
        """)  
    spark.sql(  
        f"""  
        CREATE TABLE {string}  
        USING PARQUET  
        LOCATION "/letscode/passador/projeto/trusted/{string}"  
        """  
    )

#### Convertendo as Tabelas Parquet existentes em Tabelas Delta

In [0]:
from delta.tables import DeltaTable

In [0]:
lista_parquet = dbutils.fs.ls("/letscode/passador/projeto/trusted") #Adicionando os arquivos contidos em RAW numa lista
lista_parquet

Out[35]: [FileInfo(path='dbfs:/letscode/passador/projeto/trusted/categories/', name='categories/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/customers/', name='customers/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/employee_territories/', name='employee_territories/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/employees/', name='employees/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/order_details/', name='order_details/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/orders/', name='orders/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/products/', name='products/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passador/projeto/trusted/region/', name='region/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/letscode/passad

In [0]:
#Converção das Tabelas
for n in range(len(lista_parquet)):
    string = lista_parquet[n][1]
    DeltaTable.convertToDelta(spark, f"parquet.`/letscode/passador/projeto/trusted/{string}`")

#### Registrando as Tabelas Delta

Como anteriormente os registros na Metastore estavam em parquet, deve-se atualiza-los para os registros delta, registrando-os novamente.

In [0]:
import re

In [0]:
for n in range(len(lista_parquet)):
    string = lista_parquet[n][1]
    string1 = re.sub("/","",string)
    spark.sql(
        f"""
        DROP TABLE IF EXISTS {string1}
        """)
    spark.sql(
        f"""
        CREATE TABLE {string1}
        USING DELTA
        LOCATION "/letscode/passador/projeto/trusted/{string}"
        """
    )

### Camada Refined

**Objetivo:** Criação das tabelas fato, dimensão e agregação para a terceira camada do Data Lake, o diretório `refined`.

In [0]:
#Carregando as tabelas para a montagem da tabela fato
orders = spark.read.table("orders")
order_details = spark.read.table("order_details")

In [0]:
# Visualização dos campos da tabela order_details para o Join
order_details.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)



#### Criação da Tabela Fato Orders

In [0]:
%sql
CREATE TABLE ft_orders AS
SELECT o.*, od.product_id, od.unit_price, od.quantity, od.discount
FROM Orders AS o
LEFT JOIN order_details AS od ON o.order_id = od.order_id

num_affected_rows,num_inserted_rows


In [0]:
ft_orders = spark.read.table("ft_orders")
# Gravação do Arquivo Delta
(ft_orders.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/ft_orders/"))

#### Criação das Tabelas Dimensão

In [0]:
#Carregando as tabelas para a montagem das tabelas dimensão
categories = spark.read.table("categories")
customers = spark.read.table("customers")
employee_territories = spark.read.table("employee_territories")
employees = spark.read.table("employees")
products = spark.read.table("products")
region = spark.read.table("region")
shippers = spark.read.table("shippers")
suppliers = spark.read.table("suppliers")
territories = spark.read.table("territories")
us_states = spark.read.table("us_states")

#### Dimensão Products

In [0]:
%sql
DROP TABLE IF EXISTS dm_products;
CREATE TABLE dm_products(
     sk_products BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
     product_id int not null,
     product_name string not null,
     supplier_id int not null,
     category_name string not null,
     description string not null,
     picture string not null,
     category_id int not null,
     quantity_per_unit string,
     unit_price double not null,
     units_in_stock int not null,
     units_on_order int not null,
     reorder_level int not null,
     discontinued int not null
     );

INSERT INTO dm_products (product_id, product_name, supplier_id, category_id, quantity_per_unit, unit_price, units_in_stock, units_on_order, reorder_level, discontinued, category_name, description, picture)
SELECT p.*, c.category_name, c.description, c.picture
FROM products as p
LEFT JOIN categories AS c ON p.category_id = c.category_id

num_affected_rows,num_inserted_rows
77,77


In [0]:
dm_products = spark.read.table("dm_products")
(dm_products.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/dm_products/"))

#### Dimensão Employees

In [0]:
%sql
DROP TABLE IF EXISTS dm_employees;
CREATE TABLE dm_employees(
     sk_employees BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
     employee_id int not null,
     last_name string not null,
     first_name string not null,
     title string not null,
     title_of_courtesy string not null,
     birth_date date not null,
     hire_date date not null,
     address string not null,
     city string not null,
     region string,
     region_description string,
     territory_description string,
     postal_code string not null,
     country string not null,
     home_phone string not null,
     extension string not null,
     photo string not null,
     notes string not null,
     reports_to int,
     photo_path string not null
     );

INSERT INTO dm_employees (employee_id, last_name, first_name, title, title_of_courtesy, birth_date, hire_date, address, city, region, postal_code, country, home_phone, extension, photo, notes, reports_to, photo_path, region_description, territory_description) 
SELECT em.employee_id, em.last_name, em.first_name, em.title, em.title_of_courtesy, em.birth_date, em.hire_date, em.address, em.city, em.region, em.postal_code, em.country, em.home_phone, em.extension, em.photo, em.notes, em.reports_to, em.photo_path, rg.region_description, t.territory_description
FROM employees as em 
INNER JOIN employee_territories AS emt ON em.employee_id = emt.employee_id
INNER JOIN territories AS t ON emt.territory_id = t.territory_id
INNER JOIN region AS rg ON t.region_id = rg.region_id 

num_affected_rows,num_inserted_rows
49,49


In [0]:
dm_employees = spark.read.table("dm_employees")
(dm_employees.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/dm_employees/"))

#### Dimensão Costumers

In [0]:
%sql
DROP TABLE IF EXISTS dm_customers;
CREATE TABLE dm_customers(
     sk_customers BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
     customer_id string not null,
     company_name string not null,
     contact_name string not null,
     contact_title string not null,
     address string not null,
     city string not null,
     region string,
     postal_code string,
     phone string not null,
     fax string
     );
     
INSERT INTO dm_customers (customer_id, company_name, contact_name, contact_title, address, city, region, postal_code, phone, fax) 
SELECT customer_id, company_name, contact_name, contact_title, address, city, region, postal_code, phone, fax FROM customers

num_affected_rows,num_inserted_rows
91,91


In [0]:
dm_customers = spark.read.table("dm_customers")
(dm_customers.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/dm_customers/"))

#### Dimensão Suppliers

In [0]:
%sql
DROP TABLE IF EXISTS dm_suppliers;
CREATE TABLE dm_suppliers(
     sk_suppliers BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
     supplier_id int not null,
     company_name string not null,
     contact_name string not null,
     contact_title string not null,
     address string not null,
     city string not null,
     region string,
     postal_code string not null,
     phone string not null,
     fax string,
     homepage string
     );
     
INSERT INTO dm_suppliers (supplier_id, company_name, contact_name, contact_title, address, city, region, postal_code, phone, fax, homepage) 
SELECT supplier_id, company_name, contact_name, contact_title, address, city, region, postal_code, phone, fax, homepage FROM suppliers

num_affected_rows,num_inserted_rows
29,29


In [0]:
dm_suppliers = spark.read.table("dm_suppliers")
(dm_suppliers.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/dm_suppliers/"))

#### Dimensão Shippers

In [0]:
%sql
DROP TABLE IF EXISTS dm_shippers;
CREATE TABLE dm_shippers(
     sk_shippers BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
     shipper_id int not null,
     company_name string not null,
     phone string not null
     );
     
INSERT INTO dm_shippers (shipper_id, company_name, phone)
SELECT shipper_id, company_name, phone FROM shippers

num_affected_rows,num_inserted_rows
6,6


In [0]:
dm_shippers = spark.read.table("dm_shippers")
(dm_shippers.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/dm_shippers/"))

#### Tabela Agregada Total Vendas Mensal

In [0]:
%sql
CREATE TABLE agg_mount_sales AS
SELECT DATE_FORMAT(required_date, 'MM') as Mes, DATE_FORMAT(required_date, 'y') as Ano, cast(SUM((unit_price * quantity)) as NUMERIC(15,2)) as Total_Mes
from ft_orders
GROUP BY 1,2
ORDER BY 2, 1

num_affected_rows,num_inserted_rows


In [0]:
agg_mount_sales = spark.read.table("agg_mount_sales")
(agg_mount_sales.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/agg_mount_sales/"))

In [0]:
display(agg_mount_sales)

Mes,Ano,Total_Mes
7,1996,1444.8
8,1996,31769.0
9,1996,23709.3
10,1996,31103.0
11,1996,40625.0
12,1996,49400.4
1,1997,57859.5
2,1997,54058.6
3,1997,41275.3
4,1997,45417.5


#### Tabela Agregada de Produtos (R$) por Mês

In [0]:
%sql
CREATE TABLE agg_aver_prod
SELECT pr.product_name as Descricao_Produto, cast(AVG((ft.unit_price * ft.quantity)) as NUMERIC(15,2)) as Media_Faturamento_Prod_Mes, cast(avg(ft.quantity) as NUMERIC(15,2)) as Media_Quantidade_Vendas_Mes, cast(AVG((ft.unit_price * ft.quantity))/avg(ft.quantity) as NUMERIC(15,2)) as Media_Preco  
FROM ft_orders as ft
INNER JOIN products AS pr ON ft.product_id = pr.product_id
GROUP BY 1
ORDER BY 2 DESC

num_affected_rows,num_inserted_rows


In [0]:
agg_aver_prod = spark.read.table("agg_aver_prod")
(agg_aver_prod.write
 .format("delta")
 .mode("overwrite")
 .save("/letscode/passador/projeto/refined/agg_aver_prod/"))

In [0]:
display(agg_aver_prod)

Descricao_Produto,Media_Faturamento_Prod_Mes,Media_Quantidade_Vendas_Mes,Media_Preco
Côte de Blaye,6249.34,25.96,240.75
Thüringer Rostbratwurst,2741.76,23.31,117.61
Mishi Kobe Niku,1765.4,19.0,92.92
Schoggi Schokolade,1692.39,40.56,41.73
Sir Rodney's Marmalade,1477.24,19.56,75.51
Raclette Courdavault,1412.89,27.7,51.0
Carnarvon Tigers,1184.72,19.96,59.35
Manjimup Dried Apples,1147.25,22.72,50.5
Northwoods Cranberry Sauce,1058.46,28.62,36.99
Vegie-spread,1040.96,26.18,39.77
