##### Customer Orders - Delta Live Tables
This Notebook demonstrates how to create a notebook for a Delta Live Table Pipeline. 

In [None]:
#!pip install dlt
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

##### Bronze Tables

##### orders bronze table

In [None]:
orders_path = "dbfs:/FileStore/data/orders/orders.csv"
orders_schema = StructType([
                    StructField("order_id", IntegerType(), False),
                    StructField("order_date", StringType(), False),
                    StructField("order_customer_id", IntegerType(), False),
                    StructField("order_status", StringType(), False)
                    ]
                    )

In [None]:
@dlt.table
def orders_bronze():
    df = spark.read.format("csv").option("header",True).schema(orders_schema).load(orders_path)
    return df

In [None]:
orders_path = "dbfs:/FileStore/data/orders/orders.csv"
df = spark.read.format("csv").option("header",True).schema(orders_schema).load(orders_path).display()

##### order_items bronze table

In [None]:
order_items_path = "dbfs:/FileStore/data/order_items/order_items.csv"
order_items_schema = StructType([
                    StructField("order_item_id", IntegerType(), False),
                    StructField("order_item_order_id", IntegerType(), False),
                    StructField("order_item_product_id", IntegerType(), False),
                    StructField("order_item_quantity", IntegerType(), False),
                    StructField("order_item_subtotal", DoubleType(), False),
                    StructField("order_item_product_price", DoubleType(), False)
                    ]
                    )

In [None]:
order_items_path = "dbfs:/FileStore/data/order_items/order_items.csv"
df = spark.read.format("csv").option("header",True).schema(orders_schema).load(order_items_path).display()

In [None]:
@dlt.table
def order_items_bronze():
    return spark.read.format("csv").option("header",True).schema(order_items_schema).load(order_items_path)

##### Silver Tables

###### orders Silver Table

In [None]:
@dlt.table
def orders_silver():
    return (
        dlt.read("orders_bronze")
              .select(
              'order_id', \
              to_date('order_date', "dd-MMM-yy kk.mm.ss.SS").alias('order_date'), \
              'order_customer_id', \
              'order_status', 
              current_timestamp().alias("modified_date")
               )
          )

###### order_items Silver Table

In [None]:
@dlt.table
def order_items_silver():
    return (
        dlt.read("order_items_bronze")
              .select(
              'order_item_id', \
              'order_item_product_id', \
              'order_item_product_price', \
              'order_item_quantity',
              current_timestamp().alias("modified_date")
               )
          )

In [None]:
Create Delta Lake table with partitions

In [None]:
df = spark.createDataFrame(
    [
        ("John", "Anderson", "England", "Handball"),
        ("James", "LeBron", "Usa", "Basketball"),
        ("Lilian", "Thuram", "France", "Football"),
        ("Sadio", "Mane", "Senegal", "Football"),
    ]
).toDF("first_name", "last_name", "country", "sport")


In [None]:
# Write the DataFrame out to a Delta table called country_people
df.repartition(col("country")).write.partitionBy("country").format( "delta").saveAsTable("country_people")


In [None]:
spark.sql("select * from default.country_people").display()

In [None]:
%sql
show tables

In [None]:
%sql
DESC EXTENDED country_people

In [None]:
%fs
ls dbfs:/user/hive/warehouse/country_people

In [None]:
# convert to delta sql command

In [None]:
%sql
CONVERT TO DELTA country_people

In [None]:
%fs
ls dbfs:/user/hive/warehouse/country_people

In [None]:
%fs
ls dbfs:/user/hive/warehouse/country_people/_delta_log/