In [0]:
from pyspark.sql.functions import *  #core pyspark sql funcions for data transformation
from pyspark.sql.streaming import *  #structured streaming

In [0]:
#from pyspark.sql import SparkSession

#creating a SparkSession's instance
#spark = SparkSession.builder \
    #.appName("NameofStreaming") \
    #.getOrCreate()

In [0]:
#input
origin = 'dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile'

#output
target_table = "spark_catalog.bronze.mobile_offline"
target_path = 'dbfs:/FileStore/bronze/Mobile/Offline/Mobile'
checkpoint = 'dbfs:/FileStore/bronze/Mobile/Offline/Mobile_ckpt'
schema = 'dbfs:/FileStore/bronze/Mobile/Offline/Mobile_schema'
source = 'Mobile Offline'

In [0]:
#Reading new microbatchs for streaming
streamingDF = (spark.readStream.format('cloudFiles') #databricks Auto Loader
    .option('cloudFiles.Format', 'parquet') #specifies we're reading parquet files
    .option('cloudFiles.inferColumnTypes', 'true')   #tells spark to automatically detect data types
    .option('cloudFiles.schemaLocation', schema) #where to store schema
    .option('cloudFiles.schemaEvolutionMode', 'addNewColumns')  #used to handle changes in data structure
    .load(origin)
        #metadata column for tracking
        .withColumn('tracking_source', input_file_name())
        #more metadata
        .withColumn('source', lit(source))
        #file's landing zone ingestion time
        .withColumn('ingestion_date_time', col('_metadata.file_modification_time'))
        #extra column for future flags, if necessary
        .withColumn('status', lit(True)))
    

In [0]:
#streamingDF.createOrReplaceTempView("streamingTable")

In [0]:
#Writing data stream into the bronze layer

query = (streamingDF
         .writeStream
         .queryName ("spark_catalog.bronze.mobile_offline") #query name
         .format("delta") #delta lake format for ACID, versioning
         .outputMode("append") #( append, complete, update)
         .option("checkpointLocation", checkpoint)
         .option("path", target_path)
         .trigger(availableNow=True) #batch-lie processing, process and finishes
         # .trigger(continuous='1 second') #checking for new data every 1 sec, lower latency but higher resource usage
         #.trigger(processingTime='2 seconds')
         .table(target_table)
         )

#query.awaitTermination()

In [0]:
%sql

select * from bronze.mobile_offline

Name,Address,IP,Connection_Time,Device,Speed_Connection,Connection_Status,_rescued_data,tracking_source,source,ingestion_date_time,status
Ann Gonzalez,Unit 7310 Box 7546 DPO AP 88109,172.26.7.35,2024-12-23T19:45:22.633968Z,Mobile,500,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Tony Walsh,"5078 Shannon Route Apt. 313 Wardberg, VA 27760",10.216.81.210,2024-12-23T19:45:22.634277Z,Mobile,1,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Stuart Cummings,"1523 Mccormick Vista Mariahland, WY 45747",10.63.198.136,2024-12-23T19:45:22.634938Z,Mobile,100,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Michelle Houston,"750 Leonard Junctions Apt. 073 Fletcherview, PA 05791",192.168.177.49,2024-12-23T19:45:22.635197Z,Mobile,400,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Jessica Lewis,"27208 Phillips Springs Suite 426 Elizabethborough, NY 47391",192.168.27.116,2024-12-23T19:45:22.635419Z,Mobile,1000,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Mike Dominguez,"552 Jason Corners East Christian, OR 63164",192.168.31.198,2024-12-23T19:45:22.635636Z,Mobile,1000,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Tiffany Mitchell,Unit 4361 Box 7338 DPO AP 30616,10.37.105.103,2024-12-23T19:45:22.635754Z,Mobile,25,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Nancy Freeman,"7335 Johnson Lodge East Royborough, MO 68538",172.19.132.153,2024-12-23T19:45:22.636144Z,Mobile,500,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Michelle Davis,"0301 Steven Trail Suite 882 Fuenteshaven, CO 99718",172.26.224.34,2024-12-23T19:45:22.636391Z,Mobile,1000,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
Jeremy Anderson,"17189 William Falls Maxwellstad, MA 47403",172.31.153.225,2024-12-23T19:45:22.636631Z,Mobile,15,Offline,,dbfs:/FileStore/landing_zone/Mobile/Offline/Mobile/Mobile_2024-12-23%2019:45:23.366960.parquet,Mobile Offline,2024-12-23T19:45:24Z,True
