In [0]:
checkpoint_path = '/tmp/delta-gks2/invoices/_checkpoints'

# in the place of gks, use your s3 mount
# where output goes, delta files in parquet format, _delta_logs/*.json
write_path = '/mnt/gks/delta-gks/invoices'

# Input data
upload_path = "s3://trainingmar22-invoices/invoices/"

In [0]:
# Set up the stream to begin reading incoming files from the
# upload_path location.
# {"InvoiceNo": 225124, "StockCode": "85123A", "Quantity": 10, 
# "Description": "TODO", "InvoiceDate": "03/25/2022 15:13", 
# "UnitPrice": 2.0, "CustomerID": 12583, "Country": "BE"}

df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('header', 'true') \
  .schema('InvoiceNo string, StockCode string,  Description string,    InvoiceDate string, UnitPrice double, CustomerID long,  Country string, Quantity long') \
  .load(upload_path)

# .schema('city string, year int, population long') \
# lazy avaluation, will not start stream

In [0]:
import pyspark.sql.functions as F
dfWithAmount = df.withColumn("Amount", F.col("UnitPrice") * F.col("Quantity") )
dfWithAmount.printSchema()

In [0]:
# Start the stream.
# Use the checkpoint_path location to keep a record of all files that
# have already been uploaded to the upload_path location.
# For those that have been uploaded since the last check,
# write the newly-uploaded files' data to the write_path location.
query = dfWithAmount.writeStream.format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .start(write_path)

# this starts the query, ACTION on the query
# this will compile using catalyst and execute the plan tongston on phase 1, phase 2....
# by default there will be trigger, trigger as soon as the data is available

In [0]:
# to check the output in the delta location, this is batch not stream
invoicesDf = spark.read.format('delta').load(write_path)

display(invoicesDf)

InvoiceNo,StockCode,Description,InvoiceDate,UnitPrice,CustomerID,Country,Quantity,Amount
927359,84406G,TODO,03/25/2022 15:16,2.0,17850,,5,10.0
505456,85123A,TODO,03/25/2022 15:13,1.0,17850,,10,10.0
273724,84406G,TODO,03/25/2022 15:25,3.0,12583,,10,30.0
152668,85123A,TODO,03/25/2022 15:26,4.0,17850,,8,32.0
106877,84406G,TODO,03/25/2022 15:18,4.0,17850,,2,8.0
211129,84406G,TODO,03/25/2022 15:29,4.0,13047,,1,4.0
111495,84406B,TODO,03/25/2022 15:14,5.0,17850,,8,40.0
256563,84406B,TODO,03/25/2022 15:23,1.0,17850,,5,5.0
144778,85123A,TODO,03/25/2022 15:19,3.0,13047,,7,21.0
261392,84406B,TODO,03/25/2022 15:12,4.0,17850,,4,16.0
