# Criando um Data Lakehouse

## 3. Criando tabelas Delta

### 3.1 Configuração

In [0]:
username = 'thais'

northwind = f'/letscode/{username}/northwind/'

# Primeiramente, usaremos este banco de dados apenas para converter as tabelas para Delta.
spark.sql(f'USE letscode_{username}_trusted')

# Depois, criaremos outro banco de dados para criar as tabelas fato, agregadas e dimensões.

Out[94]: DataFrame[]

### 3.2 Convertendo tabelas em Parquet para Delta

In [0]:
%sql
CONVERT TO DELTA categories;
CONVERT TO DELTA customers;
CONVERT TO DELTA employee_territories;
CONVERT TO DELTA employees;
CONVERT TO DELTA order_details;
CONVERT TO DELTA orders;
CONVERT TO DELTA products;
CONVERT TO DELTA region;
CONVERT TO DELTA shippers;
CONVERT TO DELTA suppliers;
CONVERT TO DELTA territories;
CONVERT TO DELTA us_states

### 3.3 Registrando as tabelas no Metastore

In [0]:
path_trusted = northwind + 'trusted/'

for folder in dbutils.fs.ls(path_trusted):
    folder_name = f'{folder.name}'.rsplit('/', 1)[0]
    spark.sql(
        f"""
        DROP TABLE IF EXISTS {folder_name}
        """
    )
    spark.sql(
        f"""
        CREATE TABLE {folder_name}
        USING DELTA
        LOCATION "{northwind}trusted/{folder.name}"
        """
    )

### 3.4 Exibindo os atributos da tabela "categories"

In [0]:
%sql
DESCRIBE EXTENDED categories;

col_name,data_type,comment
category_id,int,
category_name,string,
description,string,
picture,string,
,,
# Partitioning,,
Not partitioned,,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


### 3.5 Manipulando as tabelas para criar um Data Warehouse

#### 3.5.1 Lendo as tabelas em DFs

In [0]:
df_orders = spark.read.table('orders')
df_order_details = spark.read.table('order_details')
df_customers = spark.read.table('customers')
df_employees = spark.read.table('employees')
df_products = spark.read.table('products')
df_shippers = spark.read.table('shippers')
df_us_states = spark.read.table('us_states')
df_employee_territories = spark.read.table('employee_territories')
df_territories = spark.read.table('territories')
df_region = spark.read.table('region')
df_categories = spark.read.table('categories')
df_suppliers = spark.read.table('suppliers')

#### 3.5.2 Criando "surrogate keys" nas tabelas que serão definidas como dimensão.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

# Criando "surrogate keys"
dm_customers = df_customers.withColumn("sk_customers",row_number().over(Window.orderBy(monotonically_increasing_id())))
dm_employees = df_employees.withColumn("sk_employees",row_number().over(Window.orderBy(monotonically_increasing_id())))
dm_products = df_products.withColumn("sk_products",row_number().over(Window.orderBy(monotonically_increasing_id())))
dm_shippers = df_shippers.withColumn("sk_shippers",row_number().over(Window.orderBy(monotonically_increasing_id())))
dm_us_states = df_us_states.withColumn("sk_us_states",row_number().over(Window.orderBy(monotonically_increasing_id())))

#### 3.5.3 Alterando o nome das colunas

In [0]:
dm_customers = dm_customers.withColumnRenamed('customer_id','nk_customers')
dm_employees = dm_employees.withColumnRenamed('employee_id','nk_employees')
dm_products = dm_products.withColumnRenamed('product_id','nk_products')
dm_shippers = dm_shippers.withColumnRenamed('shipper_id','nk_shippers')
dm_us_states = dm_us_states.withColumnRenamed('state_id','nk_us_states')
df_employee_territories = df_employee_territories.withColumnRenamed('employee_id','nk_employees')

df_suppliers = df_suppliers.toDF('supplier_id','sup_company_name', 'sup_contact_name', 'sup_contact_title', 'sup_address', 'sup_city', 'sup_region', 'sup_postal_code', 'sup_country', 'sup_phone', 'sup_fax', 'sup_homepage')

In [0]:
# Verificando um dos DFs alterados
display(dm_customers.toPandas().head())

nk_customers,company_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax,sk_customers
ALFKI,Alfreds Futterkiste,Maria Anders,Sales Representative,Obere Str. 57,Berlin,,12209,Germany,030-0074321,030-0076545,1
ANATR,Ana Trujillo Emparedados y helados,Ana Trujillo,Owner,Avda. de la Constitución 2222,México D.F.,,05021,Mexico,(5) 555-4729,(5) 555-3745,2
ANTON,Antonio Moreno Taquería,Antonio Moreno,Owner,Mataderos 2312,México D.F.,,05023,Mexico,(5) 555-3932,,3
AROUT,Around the Horn,Thomas Hardy,Sales Representative,120 Hanover Sq.,London,,WA1 1DP,UK,(171) 555-7788,(171) 555-6750,4
BERGS,Berglunds snabbköp,Christina Berglund,Order Administrator,Berguvsvägen 8,Luleå,,S-958 22,Sweden,0921-12 34 65,0921-12 34 67,5


#### 3.5.4 Unindo "orders" e "order_details"

In [0]:
# Unindo os DFs
df_orders_2 = df_orders.join(df_order_details,['order_id'],how='inner')

# Alterando o nome das colunas
df_orders_2 = df_orders_2.withColumnRenamed('order_id','ft_orders_id').withColumnRenamed('customer_id','nk_customers').withColumnRenamed('employee_id','nk_employees').withColumnRenamed('product_id','nk_products')

display(df_orders_2.toPandas().head())

ft_orders_id,nk_customers,nk_employees,order_date,order_year,order_month,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country,nk_products,unit_price,quantity,discount
10248,VINET,5,1996-07-04T00:00:00.000+0000,1996,7,1996-08-01T00:00:00.000+0000,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,72,34.8,5,0.0
10248,VINET,5,1996-07-04T00:00:00.000+0000,1996,7,1996-08-01T00:00:00.000+0000,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,42,9.8,10,0.0
10248,VINET,5,1996-07-04T00:00:00.000+0000,1996,7,1996-08-01T00:00:00.000+0000,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,11,14.0,12,0.0
10249,TOMSP,6,1996-07-05T00:00:00.000+0000,1996,7,1996-08-16T00:00:00.000+0000,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,51,42.4,40,0.0
10249,TOMSP,6,1996-07-05T00:00:00.000+0000,1996,7,1996-08-16T00:00:00.000+0000,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,14,18.6,9,0.0


#### 3.5.5 Unindo "orders_2", "customers", "employees" e "products"
Essas tabelas serão unidas apenas para adicionar as "surrogate keys" e remover as "natural keys" na tabela fato.

In [0]:
# Unindo os DFs
df_temp_1 = df_orders_2.join(dm_customers,['nk_customers'],how='inner')
df_temp_2 = df_temp_1.join(dm_employees,['nk_employees'],how='inner')
df_temp_3 = df_temp_2.join(dm_products,['nk_products','unit_price'],how='inner')

# Selecionando as colunas da tabela fato
ft_orders = df_temp_3.select(
  'ft_orders_id',
  'sk_customers',
  'sk_employees',
  'sk_products',
  'order_date',
  'order_year',
  'order_month',
  'required_date',
  'shipped_date',
  'ship_via',
  'freight',
  'ship_name',
  'ship_address',
  'ship_city',
  'ship_region',
  'ship_postal_code',
  'ship_country',
  'unit_price',
  'quantity',
  'discount'
)

display(ft_orders.toPandas().head())

ft_orders_id,sk_customers,sk_employees,sk_products,order_date,order_year,order_month,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country,unit_price,quantity,discount
10248,85,5,72,1996-07-04T00:00:00.000+0000,1996,7,1996-08-01T00:00:00.000+0000,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,34.8,5,0.0
10498,35,8,42,1997-04-07T00:00:00.000+0000,1997,4,1997-05-05T00:00:00.000+0000,1997-04-11,2,29.75,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela,14.0,30,0.0
10498,35,8,40,1997-04-07T00:00:00.000+0000,1997,4,1997-05-05T00:00:00.000+0000,1997-04-11,2,29.75,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela,18.4,5,0.0
10498,35,8,24,1997-04-07T00:00:00.000+0000,1997,4,1997-05-05T00:00:00.000+0000,1997-04-11,2,29.75,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela,4.5,14,0.0
10499,46,4,49,1997-04-08T00:00:00.000+0000,1997,4,1997-05-06T00:00:00.000+0000,1997-04-16,2,102.02,LILA-Supermercado,Carrera 52 con Ave. Bolívar #65-98 Llano Largo,Barquisimeto,Lara,3508,Venezuela,20.0,25,0.0


#### 3.5.6 Unindo "employees", "employee_territories", "territories" e "region"

In [0]:
# Unindo os DFs
df_temp_1 = dm_employees.join(df_employee_territories,['nk_employees'],how='inner')
df_temp_2 = df_temp_1.join(df_territories,['territory_id'],how='inner')
df_temp_3 = df_temp_2.join(df_region,['region_id'],how='inner')

# Selecionando as colunas da tabela
dm_employees = df_temp_3.select(
  'sk_employees',
  'nk_employees',
  'last_name',
  'first_name',
  'title',
  'title_of_courtesy',
  'birth_date',
  'hire_date',
  'address',
  'city',
  'territory_description',
  'region',
  'region_description',
  'postal_code',
  'country',
  'home_phone',
  'extension',
  'photo',
  'notes',
  'reports_to',
  'photo_path'
)

display(dm_employees.toPandas().head())

sk_employees,nk_employees,last_name,first_name,title,title_of_courtesy,birth_date,hire_date,address,city,territory_description,region,region_description,postal_code,country,home_phone,extension,photo,notes,reports_to,photo_path
1,1,Davolio,Nancy,Sales Representative,Ms.,1948-12-08T00:00:00.000+0000,1992-05-01T00:00:00.000+0000,507 - 20th Ave. E.\nApt. 2A,Seattle,Neward,WA,Eastern,98122,USA,(206) 555-9857,5467,binary data,Education includes a BA in psychology from Colorado State University in 1970. She also completed The Art of the Cold Call. Nancy is a member of Toastmasters International.,2.0,http://accweb/emmployees/davolio.bmp
1,1,Davolio,Nancy,Sales Representative,Ms.,1948-12-08T00:00:00.000+0000,1992-05-01T00:00:00.000+0000,507 - 20th Ave. E.\nApt. 2A,Seattle,Wilton,WA,Eastern,98122,USA,(206) 555-9857,5467,binary data,Education includes a BA in psychology from Colorado State University in 1970. She also completed The Art of the Cold Call. Nancy is a member of Toastmasters International.,2.0,http://accweb/emmployees/davolio.bmp
2,2,Fuller,Andrew,"Vice President, Sales",Dr.,1952-02-19T00:00:00.000+0000,1992-08-14T00:00:00.000+0000,908 W. Capital Way,Tacoma,Louisville,WA,Eastern,98401,USA,(206) 555-9482,3457,binary data,"Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981. He is fluent in French and Italian and reads German. He joined the company as a sales representative, was promoted to sales manager in January 1992 and to vice president of sales in March 1993. Andrew is a member of the Sales Management Roundtable, the Seattle Chamber of Commerce, and the Pacific Rim Importers Association.",,http://accweb/emmployees/fuller.bmp
2,2,Fuller,Andrew,"Vice President, Sales",Dr.,1952-02-19T00:00:00.000+0000,1992-08-14T00:00:00.000+0000,908 W. Capital Way,Tacoma,Braintree,WA,Eastern,98401,USA,(206) 555-9482,3457,binary data,"Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981. He is fluent in French and Italian and reads German. He joined the company as a sales representative, was promoted to sales manager in January 1992 and to vice president of sales in March 1993. Andrew is a member of the Sales Management Roundtable, the Seattle Chamber of Commerce, and the Pacific Rim Importers Association.",,http://accweb/emmployees/fuller.bmp
2,2,Fuller,Andrew,"Vice President, Sales",Dr.,1952-02-19T00:00:00.000+0000,1992-08-14T00:00:00.000+0000,908 W. Capital Way,Tacoma,Cambridge,WA,Eastern,98401,USA,(206) 555-9482,3457,binary data,"Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981. He is fluent in French and Italian and reads German. He joined the company as a sales representative, was promoted to sales manager in January 1992 and to vice president of sales in March 1993. Andrew is a member of the Sales Management Roundtable, the Seattle Chamber of Commerce, and the Pacific Rim Importers Association.",,http://accweb/emmployees/fuller.bmp


#### 3.5.7 Unindo "products", "categories" e "suppliers"

In [0]:
# Unindo os DFs
df_temp_1 = dm_products.join(df_categories,['category_id'],how='inner')
df_temp_2 = df_temp_1.join(df_suppliers,['supplier_id'],how='inner')

# Selecionando as colunas da tabela
dm_products = df_temp_2.select(
  'sk_products',
  'nk_products',
  'product_name',
  'category_name',
  'description',
  'picture',
  'quantity_per_unit',
  'unit_price',
  'units_in_stock',
  'units_on_order',
  'reorder_level',
  'discontinued',
  'sup_company_name',
  'sup_contact_name',
  'sup_contact_title',
  'sup_address',
  'sup_city',
  'sup_region',
  'sup_postal_code',
  'sup_country',
  'sup_phone',
  'sup_fax',
  'sup_homepage'
)

display(dm_products.toPandas().head())

sk_products,nk_products,product_name,category_name,description,picture,quantity_per_unit,unit_price,units_in_stock,units_on_order,reorder_level,discontinued,sup_company_name,sup_contact_name,sup_contact_title,sup_address,sup_city,sup_region,sup_postal_code,sup_country,sup_phone,sup_fax,sup_homepage
1,1,Chai,Beverages,"Soft drinks, coffees, teas, beers, and ales",binary data,10 boxes x 30 bags,18.0,39,0,10,1,"Specialty Biscuits, Ltd.",Peter Wilson,Sales Representative,29 King's Way,Manchester,,M14 GSD,UK,(161) 555-4448,,
2,2,Chang,Beverages,"Soft drinks, coffees, teas, beers, and ales",binary data,24 - 12 oz bottles,19.0,17,40,25,1,Exotic Liquids,Charlotte Cooper,Purchasing Manager,49 Gilbert St.,London,,EC1 4SD,UK,(171) 555-2222,,
3,3,Aniseed Syrup,Condiments,"Sweet and savory sauces, relishes, spreads, and seasonings",binary data,12 - 550 ml bottles,10.0,13,70,25,0,Exotic Liquids,Charlotte Cooper,Purchasing Manager,49 Gilbert St.,London,,EC1 4SD,UK,(171) 555-2222,,
4,4,Chef Anton's Cajun Seasoning,Condiments,"Sweet and savory sauces, relishes, spreads, and seasonings",binary data,48 - 6 oz jars,22.0,53,0,0,0,New Orleans Cajun Delights,Shelley Burke,Order Administrator,P.O. Box 78934,New Orleans,LA,70117,USA,(100) 555-4822,,#CAJUN.HTM#
5,5,Chef Anton's Gumbo Mix,Condiments,"Sweet and savory sauces, relishes, spreads, and seasonings",binary data,36 boxes,21.35,0,0,0,1,New Orleans Cajun Delights,Shelley Burke,Order Administrator,P.O. Box 78934,New Orleans,LA,70117,USA,(100) 555-4822,,#CAJUN.HTM#


### 3.6 Limpeza
Removendo diretórios, caso este notebook seja executado mais de uma vez.

In [0]:
dbutils.fs.rm(northwind + "refined/ft_orders", recurse=True)

dbutils.fs.rm(northwind + "refined/ft_year_orders", recurse=True)
dbutils.fs.rm(northwind + "refined/ft_month_orders", recurse=True)

dbutils.fs.rm(northwind + "refined/dm_employees", recurse=True)
dbutils.fs.rm(northwind + "refined/dm_products", recurse=True)
dbutils.fs.rm(northwind + "refined/dm_customers", recurse=True)
dbutils.fs.rm(northwind + "refined/dm_shippers", recurse=True)
dbutils.fs.rm(northwind + "refined/dm_us_states", recurse=True)

Out[104]: False

### 3.7 Gravando os arquivos Delta

In [0]:
(ft_orders.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/ft_orders'))

(dm_employees.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/dm_employees'))

(dm_products.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/dm_products'))

(dm_customers.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/dm_customers'))

(dm_shippers.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/dm_shippers'))

(dm_us_states.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/dm_us_states'))

### 3.8 Criando um banco de dados para armazenar as tabelas do DW

In [0]:
spark.sql(f'CREATE DATABASE IF NOT EXISTS letscode_{username}_refined')
spark.sql(f'USE letscode_{username}_refined')

Out[106]: DataFrame[]

### 3.9 Registrando as tabela Delta no Metastore

In [0]:
spark.sql(f'DROP TABLE IF EXISTS ft_orders')
spark.sql(f'DROP TABLE IF EXISTS dm_customers')
spark.sql(f'DROP TABLE IF EXISTS dm_employees')
spark.sql(f'DROP TABLE IF EXISTS dm_products')
spark.sql(f'DROP TABLE IF EXISTS dm_shippers')
spark.sql(f'DROP TABLE IF EXISTS dm_us_states')


spark.sql(
    f"""
    CREATE TABLE ft_orders
    USING DELTA
    LOCATION "{northwind}refined/ft_orders"
    """
)
spark.sql(
    f"""
    CREATE TABLE dm_customers
    USING DELTA
    LOCATION "{northwind}refined/dm_customers"
    """
)
spark.sql(
    f"""
    CREATE TABLE dm_employees
    USING DELTA
    LOCATION "{northwind}refined/dm_employees"
    """
)
spark.sql(
    f"""
    CREATE TABLE dm_products
    USING DELTA
    LOCATION "{northwind}refined/dm_products"
    """
)
spark.sql(
    f"""
    CREATE TABLE dm_shippers
    USING DELTA
    LOCATION "{northwind}refined/dm_shippers"
    """
)
spark.sql(
    f"""
    CREATE TABLE dm_us_states
    USING DELTA
    LOCATION "{northwind}refined/dm_us_states"
    """
)


Out[107]: DataFrame[]

### 3.10 Criando dois DataFrames agregados

In [0]:
from pyspark.sql.functions import sum, col, avg

# Agregando o valor total de vendas por ano, sem levar o frete em consideração
ft_year_orders = (
  ft_orders
  .groupby('order_year')
  .agg((sum(col('unit_price') * col('quantity') - col('discount'))).alias('total_sales'))
  .sort(('order_year'))
)

# Agregando o valor total de vendas por mês, sem levar o frete em consideração
ft_month_orders = (
  ft_orders
  .groupby('order_year','order_month')
  .agg((sum(col('unit_price') * col('quantity') - col('discount'))).alias('total_sales'))
  .sort((['order_year','order_month']))
)

In [0]:
display(ft_year_orders)

order_year,total_sales
1996,174.0
1997,504615.2499999997
1998,469734.5999999999


In [0]:
display(ft_month_orders)

order_year,order_month,total_sales
1996,7,174.0
1997,4,51167.13999999999
1997,5,56818.64999999999
1997,6,38928.79999999999
1997,7,55072.48
1997,8,49976.84
1997,9,58952.22
1997,10,70322.29999999999
1997,11,45909.06
1997,12,77467.76


#### 3.10.1 Gravando os arquivos Delta

In [0]:
(ft_year_orders.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/ft_year_orders'))

(ft_month_orders.write
 .format("delta")
 .mode("overwrite")
 .save(northwind + 'refined/ft_month_orders'))

#### 3.10.2 Registrando as tabela Delta no Metastore

In [0]:
spark.sql(f'DROP TABLE IF EXISTS ft_year_orders')
spark.sql(f'DROP TABLE IF EXISTS ft_month_orders')


spark.sql(
    f"""
    CREATE TABLE ft_year_orders
    USING DELTA
    LOCATION "{northwind}refined/ft_year_orders"
    """
)
spark.sql(
    f"""
    CREATE TABLE ft_month_orders
    USING DELTA
    LOCATION "{northwind}refined/ft_month_orders"
    """
)

Out[112]: DataFrame[]

In [0]:
%sql
SELECT *
FROM ft_year_orders
WHERE order_year = 1997;

order_year,total_sales
1997,504615.2499999997
