# Stream Customers Data From Cloud Files to Delta Lake

In [0]:
%python
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DateType,
    TimestampType
)

customer_schema = StructType([
    StructField("customer_id", IntegerType()),
    StructField("customer_name", StringType()),
    StructField("date_of_birth", DateType()),
    StructField("telephone", StringType()),
    StructField("email", StringType()),
    StructField("member_since", DateType()),
    StructField("created_timestamp", TimestampType())
])

In [0]:
%python
customer_df = (   
                       spark.readStream
                            .format("json")
                            .schema(customer_schema)
                            .load("/Volumes/gizmobox/landing/operational_data/customer_schema/")
)  

# Transform Dataframe to Add Column

1.file_path;Cloud file path

2.ingestion_date;current_timestamp

In [0]:
%python
from pyspark.sql.functions import col, current_timestamp

customer_transformed_df = (
    customer_df.withColumn(
        "file_path",
        col("_metadata.file_path")
    ).withColumn(
        "ingestion_date",
        current_timestamp()
    )
)


# Write Tranformed Data Stream to Delta Table

In [0]:
%python
streaming_query = (
    customer_transformed_df.writeStream
    .format("delta")
    .option(
        "checkpointLocation",
        "/Volumes/gizmobox/landing/operational_data/customer_schema/_checkpoint_stream"
    )
    .trigger(availableNow=True)
    .toTable("gizmobox.bronze.customer_stream")
)

In [0]:
%python
streaming_query.stop()

In [0]:
select * from gizmobox.bronze.customer_stream;