# Reading from a Streaming Source Data and Writing

***Notebook Name*** : 01-Processing-Streaming-Source-Data

***Source Stream Data Schema*** : "ARRIVAL_IN_TONNES double,DATETIME_OF_PRICING string ,MARKET_NAME string,MAXIMUM_PRICE double,MINIMUM_PRICE double,MODAL_PRICE double,ORIGIN string,PRODUCTGROUP_NAME string,PRODUCT_NAME string,ROW_ID long,STATE_NAME string,VARIETY string,source_stream_load_datetime string"

##### Structured Streaming Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>

In [0]:
%python
sourceStreamJSONFilePath = 'abfss://bronze-dev@dbrkcrse20251storagedev.dfs.core.windows.net/daily-pricing-streaming-source-data'
sinkStreamJSONFilePath = 'abfss://bronze-dev@dbrkcrse20251storagedev.dfs.core.windows.net/daily-pricing-streaming-data/json'

In [0]:
sourceStreamJSONFileDF = (spark
                          .readStream
                          .schema("ARRIVAL_IN_TONNES double,DATETIME_OF_PRICING string ,MARKET_NAME string,MAXIMUM_PRICE double,MINIMUM_PRICE double,MODAL_PRICE double,ORIGIN string,PRODUCTGROUP_NAME string,PRODUCT_NAME string,ROW_ID long,STATE_NAME string,VARIETY string,source_stream_load_datetime string")
                          .format("json")
                          .load(sourceStreamJSONFilePath))


In [0]:
(sourceStreamJSONFileDF
 .writeStream
 .format("json")
 .start(sinkStreamJSONFilePath)
)

In [0]:
sinkStreamJSONcheckpointPath = 'abfss://bronze-dev@dbrkcrse20251storagedev.dfs.core.windows.net/daily-pricing-streaming-data/json/checkpoint'

streamProcessingQuery = (sourceStreamJSONFileDF
 .writeStream
 .outputMode("append")
 .format("json")
 .queryName("stream-processing")
 .trigger(processingTime = "5 Minutes")
 .option("checkpointLocation", sinkStreamJSONcheckpointPath)
 .start(sinkStreamJSONFilePath)
)

In [0]:
streamProcessingQuery.id

In [0]:
streamProcessingQuery.status

In [0]:
streamProcessingQuery.lastProgress

In [0]:
StreamDF = (spark
             .read
             .format("json")
             .load(sinkStreamJSONFilePath)
             )

display(StreamDF.count())

In [0]:
streamProcessingQuery.stop()