#Create delta tables
 
Objective: Convert a Parquet table into Delta table.

Remind that a Delta table has three main characteristics:
- data files are in object storage (i.e. AWS S3, Azure Data Lake Storage)
- transaction log is saved with data files in object storage
- table registered in Metastore - this is an optional step, but usually recommended

## Configuration

Before executing thie cell, add your name in:
<a href="$./includes/configuration" target="_blank">
includes/configuration</a>

```username = "your_name"```
```                      ```

In [0]:
%run ./includes/configuration

Out[3]: DataFrame[]

## Create tables
With Delta lake, tables are created:
* Ingesting new files into one Delta table for the firt time
* Transforming an existing Parquet table into Delta table

**Note:**
In this sample project, files are saved to Databricks File System (DBFS).
The good practice is to save files in a cloud based storage. DBFS is used only for demo purpose.

#### Step 0: Recover `Northwind` trusted tables
Trusted tables were saved in `trusted_path`

**Clean up and data recovery**

In [0]:
for table_name in northwind_tables_trusted:
  spark.sql(
    f"""
    DROP TABLE IF EXISTS {table_name}
    """
  )

for table_name in northwind_tables_refined:
  spark.sql(
    f"""
    DROP TABLE IF EXISTS {table_name}
    """
  )

In [0]:
for table_name in northwind_tables_trusted:
  spark.sql(
    f"""
    CREATE TABLE {table_name}
    USING PARQUET
    LOCATION "{trusted_path}{table_name}"
    """
  )

**Register partitions**

In [0]:
spark.sql("MSCK REPAIR TABLE orders_trusted")

Out[12]: DataFrame[]

#### Step 1: Describe `Northwind` tables
Before converting `Northwind` tables, let us run Spark SQL command `DESCRIBE`, with optional parameter `EXTENDED`, to collect the attribute `Provider` and check if it is `PARQUET`.

In [0]:
for table_name in northwind_tables_trusted:
  df = spark.sql(
    f"""
    DESCRIBE EXTENDED {table_name};
    """
    )
  provider = df.filter(df.col_name == 'Provider').collect()[0][1].upper()
  assert provider == 'PARQUET', f"provider is {provider}, not PARQUET"
print("Assertion passed - provider is PARQUET.")

Assertion passed - provider is PARQUET.


### Convert existing Parquet tables into Delta tables

#### Step 1: Convert files to Delta files

First, files are converted to Delta. The conversion creates a transaction log of Delta Lake, which tracks the associated files.

In [0]:
from delta.tables import DeltaTable

for table_name in northwind_tables_trusted:
  parquet_table = f"parquet.`{trusted_path}{table_name}`"
  if table_name == 'orders_trusted':
    # convert partitioned table
    partitioning_scheme = "order_year string, order_month string, order_day string"
    DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)
  else:
    # convert unpartitioned table
    DeltaTable.convertToDelta(spark, parquet_table)

#### Step 2: Register Delta tables
At this point, files were converted to Delta.
The Metastore, on the other hand, was not updated.
In order to make it up to date, tables are registered to Metastore once again.
The command Spark SQK infers the data schema by reading the footnote of Delta file.

In [0]:
for table_name in northwind_tables_trusted:
  spark.sql(f"""
  DROP TABLE IF EXISTS {table_name}
  """)

  spark.sql(f"""
  CREATE TABLE {table_name}
  USING DELTA
  LOCATION "{trusted_path}{table_name}" 
  """)

#### Step 3: Add comments to columns

Comments are helpful in maintenance phase and make tables readable. Command `ALTER TABLE` is used to add new comments do existing Delta tables.

In [0]:
for table_name in northwind_columns:
  table_name_trusted = table_name + '_trusted'
  replace_columns = ''
  counter = len(northwind_columns[table_name])
  for param in northwind_columns[table_name]:
    replace_columns += param[2] + ' ' + param[1].upper() + ' COMMENT ' + param[3]
    counter -= 1
    if counter:
      replace_columns += ', '
  spark.sql(
    f"""
    ALTER TABLE {table_name_trusted}
    REPLACE COLUMNS ({replace_columns})
    """
  )

#### Step 4: describe `Northwind` tables
Comments are verified using the Spark SQL command `DESCRIBE`, followed by optional command `EXTENDED`. Comments and aditional information are displayed. Scroll down to confirm the "provider" is `delta`.

**Verify comments of table `orders_trusted`**

In [0]:
display(
  spark.sql(
    f"""
    DESCRIBE EXTENDED orders_trusted;
    """
  )
)

col_name,data_type,comment
order_id,smallint,
customer_id,string,
employee_id,smallint,
order_date,date,YYYY-mm-dd
order_year,string,YYYY
order_month,string,mm
order_day,string,dd
required_date,date,YYYY-mm-dd
shipped_date,date,YYYY-mm-dd
ship_via,smallint,


**Verify `Provider`**

In [0]:
for table_name in northwind_tables_trusted:
  df = spark.sql(
    f"""
    DESCRIBE EXTENDED {table_name};
    """
    )
  provider = df.filter(df.col_name == 'Provider').collect()[0][1].upper()
  assert provider == 'DELTA', f"provider is {provider}, not DELTA"
print("Assertion passed - provider is DELTA.")

Assertion passed - provider is DELTA.


#### Step 5: Count entries of `Northwind` tables
Entries of trusted layers are counted using Apache Spark.
In Delta Lake, the Delta table does not need repair and it is ready to use.

In [0]:
# dictionary of processed tables
northwind_processed = {}

for table_name in northwind_tables_trusted:
  northwind_processed[table_name] = spark.read.table(table_name)
  print(table_name + ":", northwind_processed[table_name].count())

categories_trusted: 8
customer_customer_demo_trusted: 0
customer_demographics_trusted: 0
customers_trusted: 91
employees_trusted: 9
employee_territories_trusted: 49
order_details_trusted: 2155
orders_trusted: 830
products_trusted: 77
region_trusted: 4
shippers_trusted: 6
suppliers_trusted: 29
territories_trusted: 53
us_states_trusted: 51


### Create new Delta tables in refined layer
New Delta tables will be created:
- Fact table: `ft_orders`
- Dimension tables: `dm_customers`, `dm_employees`, `dm_products`, `dm_shippers`

In [0]:
# dictionary of refined layer
refined_data_dict = {}

#### Fact table `ft_orders`
Join tables `orders_trusted` and `order_details_trusted`. The key is composed by `order_id` and `product_id`.

In [0]:
df = northwind_processed['orders_trusted'].join(northwind_processed['order_details_trusted'],
                                                northwind_processed['orders_trusted'].order_id == northwind_processed['order_details_trusted'].order_id2,
                                                'outer')

refined_data_dict['ft_orders'] = df.drop('order_id2')

#### Dimension table `dm_customers`
Join tables `customers_trusted`, `customer_customer_demo_trusted` and `customer_demographics_trusted`. Key is `customer_id`.

In [0]:
df = northwind_processed['customers_trusted'].join(northwind_processed['customer_customer_demo_trusted'],
                                                   northwind_processed['customers_trusted'].customer_id == northwind_processed['customer_customer_demo_trusted'].customer_id2,
                                                   'outer')

df = df.join(northwind_processed['customer_demographics_trusted'],
             df.customer_type_id2 == northwind_processed['customer_demographics_trusted'].customer_type_id,
             'outer')

refined_data_dict['dm_customers'] = df.drop('customer_id2', 'customer_type_id2')

#### Dimension table `dm_employees`
Join tables `employees_trusted`, `employee_territories_trusted`, `territories_trusted` and `region_trusted`. Key is composed by `employee_id` and `territory_id`.

In [0]:
df = northwind_processed['employees_trusted'].join(northwind_processed['employee_territories_trusted'],
                                                   northwind_processed['employees_trusted'].employee_id == northwind_processed['employee_territories_trusted'].employee_id2,
                                                   'outer')

df = df.join(northwind_processed['territories_trusted'],
             df.territory_id2 == northwind_processed['territories_trusted'].territory_id,
             'outer')

df = df.join(northwind_processed['region_trusted'],
             df.region_id2 == northwind_processed['region_trusted'].region_id,
             'outer')

refined_data_dict['dm_employees'] = df.drop('employee_id2', 'territory_id2', 'region_id2')

#### Dimension table `dm_products`
Join tables `products_trusted`, `suppliers_trusted` and `categories_trusted`. Key is `product_id`.

In [0]:
df = northwind_processed['products_trusted'].join(northwind_processed['suppliers_trusted'],
                                                  northwind_processed['products_trusted'].supplier_id2 == northwind_processed['suppliers_trusted'].supplier_id,
                                                  'outer')

df = df.join(northwind_processed['categories_trusted'],
             df.category_id2 == northwind_processed['categories_trusted'].category_id,
             'outer')

refined_data_dict['dm_products'] = df.drop('supplier_id2', 'category_id2')

#### Dimension table `dm_shippers`
Table `shippers_trusted`. Key is `shipper_id`.

In [0]:
refined_data_dict['dm_shippers'] = northwind_processed['shippers_trusted']

#### Save files to refined folder

1. Use `.format("delta")`
2. Partition table 'ft_orders' using columns `order_year`, `order_month`, `order_day`

In [0]:
dbutils.fs.rm(refined_path, recurse=True)

for table_name in refined_data_dict:
  if table_name == 'ft_orders':
    # save partitioned table
    table_name = 'ft_orders'
    (refined_data_dict[table_name].write
     .mode("overwrite")
     .format("delta")
     .partitionBy("order_year", "order_month", "order_day")
     .save(refined_path + table_name))
  else:
    # save unpartitioned table
    (refined_data_dict[table_name].write
     .mode("overwrite")
     .format("delta")
     .save(refined_path + table_name))