## Stream Customers Data From Cloud Files to Delta Lake
1. Read files from cloud storage using DataStreamReader API
1. Transform the dataframe to add the following columns
    -   file path: Cloud file path
    -   ingestion date: Current Timestamp
1. Write the transformed data stream to Delta Lake Table

### 1. Read files using DataStreamReader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType

customers_schema = StructType(fields=[StructField("customer_id", IntegerType()),
                                     StructField("customer_name", StringType()),
                                     StructField("date_of_birth", DateType()),
                                     StructField("telephone", StringType()),
                                     StructField("email", StringType()),
                                     StructField("member_since", DateType()),
                                     StructField("created_timestamp", TimestampType())
                                    ]
                              )

In [0]:
customers_df = (
                    spark.readStream
                         .format("json")
                         .schema(customers_schema)
                         .load("/Volumes/gizmobox/landing/operational_data/customers_stream/")
)

### 2. Transform the dataframe to add the following columns
- file path: Cloud file path
- ingestion date: Current Timestamp

In [0]:
from pyspark.sql.functions import current_timestamp, col

customers_transformed_df = (
                                customers_df.withColumn("file_path", col("_metadata.file_path"))
                                            .withColumn("ingestion_date", current_timestamp())
)

### 3. Write the transformed data stream to Delta Table 

In [0]:
streaming_query = (
                    customers_transformed_df.writeStream
                        .format("delta")
                        .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_stream/_checkpoint_stream")
                        .toTable("gizmobox.bronze.customers_stream")
)

In [0]:
streaming_query.stop()

In [0]:
%sql
SELECT * FROM gizmobox.bronze.customers_stream;