In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True)
])


messages_schema = StructType([
    StructField("message_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("message", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])
#.schema(schema)\

def stream_mount_into_table(mount_path,table_name,schema):
    streaming_df = spark.readStream\
                    .format("cloudFiles")\
                    .option('cloudFiles.format', 'csv')\
                    .schema(schema)\
                    .load(mount_path)\
                    .writeStream\
                    .option("checkpointLocation",f"/tmp/checkpoint/{table_name}")\
                    .table(table_name)

customers_path = 'dbfs:/mnt/customers/'
customers_table = 'customers_bronze3'
messages_path = 'dbfs:/mnt/messages/'
messages_table = 'messages_bronze3'
stream_mount_into_table(customers_path,customers_table,customers_schema)
stream_mount_into_table(messages_path,messages_table,messages_schema)

In [0]:
%sql
CREATE OR REFRESH LIVE TABLE customer_messages_silver AS
SELECT *,
CASE WHEN lower(message) LIKE '%warning%' THEN 'Warning'
     WHEN lower(message) LIKE '%restored%' THEN 'Restored'
     WHEN lower(message) LIKE '%outage%' THEN 'Outage'
     WHEN lower(message) LIKE '%estimate%' THEN 'Estimate'
     ELSE NULL
END AS status_update,
to_date(timestamp) as date
FROM
(SELECT * FROM customers_bronze3
LEFT JOIN messages_bronze3
using (customer_id))

Name,Type
customer_id,int
first_name,string
last_name,string
email,string
message_id,int
message,string
timestamp,timestamp
status_update,string
date,date


In [0]:
%sql
CREATE OR REFRESH LIVE TABLE sent_messages_gold AS
SELECT date, customer_id, count(message_id) as messages_sent
FROM customers_messages_silver
where date is not null
group by 1,2

Name,Type
date,date
customer_id,bigint
messages_sent,bigint
