## Check and read data from volume

Prerequisites:
1. three data set are created in a volume in my catalog and my schema

In [0]:
%run ./setup

In [0]:
rawDataVolume='dbfs:/Volumes/dbacademy/labuser9128531_1739377194/retail/'

In [0]:
userRawDataVolume = rawDataVolume + '/users'
print('User raw data under folder: ' + userRawDataVolume)
 #Listing the files under the directory
for fileInfo in dbutils.fs.ls(userRawDataVolume): print(fileInfo.name)

In [0]:
display(dbutils.fs.ls(rawDataVolume+"/orders"))

In [0]:
%sql
SELECT * FROM json.`/Volumes/dbacademy/labuser9128531_1739377194/retail/users`

## Ingest raw JSON and CSV data into Bronze data layer

Prerequisites:
1. dbfs:/wy has been created

In [0]:
# This directory only stores schema and checkpoints of silver data
# the data iteself is stored in catalog.schema as delta table
deltaTablesDirectory='dbfs:/wy/delta_tables' # string name of the directory

In [0]:
dbutils.fs.rm("dbfs:/wy/delta_tables/checkpoint/", True)

In [0]:
deltaTablesDirectory,rawDataVolume

### Ingestion

add three delta tables in unity catalog

In [0]:
def ingest_folder(folder, data_format, table):
  bronze_products = (spark.readStream
                      .format("cloudFiles")
                      .option("cloudFiles.format", data_format)
                      .option("cloudFiles.inferColumnTypes", "true")
                      .option("cloudFiles.schemaLocation",
                              f"{deltaTablesDirectory}/schema/{table}") #Autoloader will automatically infer all the schema & evolution
                      .load(folder)) # vol folder that has row data
  return (bronze_products.writeStream
            .option("checkpointLocation",
                    f"{deltaTablesDirectory}/checkpoint/{table}") #exactly once delivery on Delta tables over restart/kill
            .option("mergeSchema", "true") #merge any new column dynamically
            .trigger(once = True) #Remove for real time streaming
            .table(table)) #Table will be created if we haven't specified the schema first
  
#ingest data from a volume, returned table is stored in the same catalog.schema  
ingest_folder(rawDataVolume + '/orders', 'json', 'churn_orders_bronze')
ingest_folder(rawDataVolume + '/events', 'csv', 'churn_app_events')
ingest_folder(rawDataVolume + '/users', 'json',  'churn_users_bronze').awaitTermination()

### Check Bronze tables in unity catalog

In [0]:
%sql 
-- Note the "_rescued_data" column. If we receive wrong data not matching existing schema, it will be stored here
select * from churn_users_bronze;

In [0]:
%fs ls dbfs:/Volumes/dbacademy/labuser9128531_1739377194/retail/

In [0]:
display(dbutils.fs.ls('dbfs:/wy/delta_tables/'))

In [0]:
%fs ls dbfs:/wy/delta_tables/schema/

## Create silver level 

saved the bronze tables to the same catalog.schema

### Create churn user in the catalog.schema

add two more tables in unity catalog

In [0]:
from pyspark.sql.functions import sha1, col, initcap, to_timestamp

(spark.readStream
        .table("churn_users_bronze")
        .withColumnRenamed("id", "user_id")
        .withColumn("email", sha1(col("email")))
        .withColumn("creation_date", to_timestamp(col("creation_date"), "MM-dd-yyyy H:mm:ss"))
        .withColumn("last_activity_date", to_timestamp(col("last_activity_date"), "MM-dd-yyyy HH:mm:ss"))
        .withColumn("firstname", initcap(col("firstname")))
        .withColumn("lastname", initcap(col("lastname")))
        .withColumn("age_group", col("age_group").cast('int'))
        .withColumn("gender", col("gender").cast('int'))
        # .drop(col("churn")) # should not drop label
        .drop(col("_rescued_data"))
      .writeStream
        .option("checkpointLocation", f"{deltaTablesDirectory}/checkpoint/users")
        .trigger(once=True)
        .table("churn_users").awaitTermination())

In [0]:
%sql select * from churn_users;

In [0]:
(spark.readStream 
        .table("churn_orders_bronze")
        .withColumnRenamed("id", "order_id")
        .withColumn("amount", col("amount").cast('int'))
        .withColumn("item_count", col("item_count").cast('int'))
        .withColumn("creation_date", to_timestamp(col("transaction_date"), "MM-dd-yyyy H:mm:ss"))
        .drop(col("_rescued_data"))
      .writeStream
        .option("checkpointLocation", f"{deltaTablesDirectory}/checkpoint/orders")
        .trigger(once=True)
        .table("churn_orders").awaitTermination())

In [0]:
%sql select * from churn_orders;

In [0]:
%sql select * from churn_orders;

## Create gold layer

prerequisites:
catalog and schema are set up

In [0]:
spark.sql(
  """
    CREATE OR REPLACE TABLE churn_features AS
      WITH
        churn_orders_stats AS (
          SELECT
            user_id,
            count(*) as order_count,
            sum(amount) as total_amount,
            sum(item_count) as total_item,
            max(creation_date) as last_transaction
          FROM churn_orders
          GROUP BY user_id
        ),  
        churn_app_events_stats AS (
          SELECT
            first(platform) as platform,
            user_id,
            count(*) as event_count,
            count(distinct session_id) as session_count,
            max(to_timestamp(date, "MM-dd-yyyy HH:mm:ss")) as last_event
          FROM churn_app_events GROUP BY user_id
        )
        SELECT
          *, 
          datediff(now(), creation_date) as days_since_creation,
          datediff(now(), last_activity_date) as days_since_last_activity,
          datediff(now(), last_event) as days_last_event
        FROM churn_users
        INNER JOIN churn_orders_stats using (user_id)
        INNER JOIN churn_app_events_stats using (user_id)
  """
)

display(spark.table("churn_features"))

## Optimize the delta table

In [0]:
%sql
ALTER TABLE churn_users    SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE churn_orders   SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE churn_features SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );

In [0]:
%sql
OPTIMIZE churn_users ZORDER BY user_id, firstname, lastname