## Stream Customers Data From Cloud Files to Delta Lake
1. Read files from cloud storage using DataStreamReader API
1. Transform the dataframe to add the following columns
    -   file path: Cloud file path
    -   ingestion date: Current Timestamp
1. Write the transformed data stream to Delta Lake Table

In [0]:
# 1.
from pyspark.sql.types import *
from pyspark.sql import functions as F

customers_schema = StructType(fields = [StructField("customer_id", IntegerType()),
                                       StructField("customer_name", StringType()),
                                       StructField("date_of_birth", DateType()),
                                       StructField("telephone", StringType()),
                                       StructField("email", StringType()),
                                       StructField("member_since", DateType()),
                                       StructField("created_timestamp",TimestampType())]
)

In [0]:
customer_df = (
        spark.readStream 
        .format("json")
        .schema(customers_schema)
        .load("/Volumes/gizmobox/landing/operational_data/customers_stream/")
)

In [0]:
# 2.
customers_transormed_df = (
           customer_df.withColumn("file_path", F.col("_metadata.file_path"))
                      .withColumn("ingestion_date", F.current_timestamp())
)

In [0]:
# 3.

streaming_query = (
                    customers_transormed_df.writeStream
                        .format("delta")
                        .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_stream/_checkpoint_stream")
                        .toTable("gizmobox.bronze.customers_stream")
)

In [0]:
streaming_query.stop()

In [0]:
%sql

select * from gizmobox.bronze.customers_stream

In [0]:
import requests
import json

# Get the current context
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host = ctx.apiUrl().get()
token = ctx.apiToken().get()
cluster_id = ctx.clusterId().get()

# Make API call
response = requests.get(
    f"{host}/api/2.0/clusters/get",
    headers={"Authorization": f"Bearer {token}"},
    params={"cluster_id": cluster_id}
)

cluster_info = response.json()
print(json.dumps(cluster_info, indent=2))