
#### Creating a user with S3 access

- Go to: https://aws.amazon.com/console/
- on the search for service bar -> Type "IAM" and select "Users" from the menu
- from the users page -> click on "add users"
- give username as "loony_user" -> tick marks on "access key - programatic access"
- click on "next : permission"
- from the set permission page -> click on "Attach existing policies directly"
- Search for "s3" on the Filter policies tab
- and select by ticking the " AmazonS3FullAccess" -> Click on "Next:Tags"
- Click on "Next : Review"
- From the review page :
- Click on "create user" to create a new user

once user is created we will get the Access Key and Secret key
we will save these details to a file and save it as loony_user_credentials.csv

------------------------------------------------------------
#### Showing the S3 bucket with data

- Search for S3 using the search bar
- select S3 from "Storage"
- Bucket name : loony-delta-source-bucket (already created)
- Folder name: headphone_data/ (already created)
- Folder already contains one CSV file

In [0]:
import urllib
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType


#### Configure credentials to access AWS

- Go to Compute -> Edit compute
- Add the following environment variables

`AWS_ACCESS_KEY_ID="AKIATOKI4M3BCKJQ56GX"`

`AWS_SECRET_ACCESS_KEY="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"`


In [0]:
aws_s3_bucket = "loony-delta-source-bucket"
 
dbutils.fs.ls(f"s3a://{aws_s3_bucket}/")

[FileInfo(path='s3a://loony-delta-source-bucket/_checkpoint/', name='_checkpoint/', size=0, modificationTime=1749121221805),
 FileInfo(path='s3a://loony-delta-source-bucket/attrition_data/', name='attrition_data/', size=0, modificationTime=1749121221805),
 FileInfo(path='s3a://loony-delta-source-bucket/bank_churn_data/', name='bank_churn_data/', size=0, modificationTime=1749121221805),
 FileInfo(path='s3a://loony-delta-source-bucket/headphone_data/', name='headphone_data/', size=0, modificationTime=1749121221805)]

In [0]:
headphone_stream_df = spark.readStream.format("cloudFiles") \
                           .option("cloudFiles.format", "csv") \
                           .option("cloudFiles.schemaEvolutionMode", "rescue") \
                           .option("cloudFiles.inferColumnTypes", "true") \
                           .option("cloudFiles.schemaLocation", 
                                   f"s3://{aws_s3_bucket}/_checkpoint") \
                           .load(f"s3://{aws_s3_bucket}/headphone_data/*")

headphone_stream_df.display()

title,brand,color,type,avg_rating,num_of_ratings,selling_price,MRP,_rescued_data
Wings Phantom Pro Gaming,Wings Phantom,Black,True Wireless,4.0,1934,2493,5499,
Boult Audio ProBass Curve,Boult Audio,Blue,True Wireless,4.0,333036,799,3499,
Aroma NB120 Tehalka,Aroma,Black,In the Ear,3.9,10398,379,1999,
N2B MAGNET Red,N2B,Black,True Wireless,3.5,20079,260,1875,
OnePlus Bullets Wireless Z,OnePlus,Bold Black,In the Ear,4.3,437395,1999,2190,
Aroma NB119B Badshah,Aroma,Black,In the Ear,4.0,29426,459,1499,
Oxhox HBS-730 Sports Stereo,Oxhox,Black,In the Ear,3.5,12909,260,1999,
Enacfire E60 Bullets Wireless,Enacfire,Reverb black,In the Ear,4.3,437106,599,3800,
boAt Bassheads 103 Black,boAt,Black,In the Ear,4.2,1299042,349,1290,
boAt Rockerz 235v2,boAt,Black,In the Ear,3.4,48161,999,2990,


* Expand the monitoring graph and show the spike when streaming data is read in
* Go the AWS S3 bucket and add file 02 to the source
* Show the second spike when the data is read in (number of rows would have also increased 20 - 43)


#### Files written in the Delta format

Creates a Delta Lake table on disk.

If you want it to be queryable by name in Spark SQL, also register it in the metastore.

In [0]:
headphone_stream_df.writeStream\
                   .format("delta")\
                   .outputMode("append")\
                   .option("checkpointLocation", "/delta/events/_checkpoints/headphones")\
                   .start("/delta/headphones")

<pyspark.sql.streaming.query.StreamingQuery at 0x7a7ba1b4fc50>

In [0]:
%sql
 
SELECT * from delta.`/delta/headphones`

title,brand,color,type,avg_rating,num_of_ratings,selling_price,MRP,_rescued_data
boAt Airdopes 131 Bluetooth,boAt,Midnight Blue,True Wireless,3.4,12140,1099,2990,
Allmusic powerful driven bass,Allmusic powerful,Multicolor,In the Ear,4.0,15841,260,1599,
Aroma NB119C Carter,Aroma,Black,In the Ear,3.7,92346,449,1999,
Boult Audio AirBass Combuds,Boult Audio,White,True Wireless,3.9,43126,1299,4999,
Mivi Duopods F30,Mivi,White,True Wireless,3.9,99486,999,2999,
Mivi Collar Classic Neckband,Mivi,Black,In the Ear,3.4,56834,699,2499,
CatBull In-ear Bluetooth Headset,CatBull,Black,True Wireless,3.6,1904,130,499,
FIER MGNT_k1-2 Bluetooth Headset,FIER,Black,True Wireless,4.2,887466,228,999,
boAt Rockerz 255F Bluetooth,boAt,Active Black,In the Ear,4.3,90,1099,2990,
Grostar Moonwalk Mini in,Grostar,Red,In the Ear,4.1,14148,699,1995,


In [0]:
%sql
 
DESCRIBE HISTORY delta.`/delta/headphones`

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-06-05T11:07:18Z,599420620358905,contact@loonycorn.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 53949ff6-4ee1-402f-b711-ae6a56ac2981, epochId -> 2, statsOnLoad -> false)",,List(2326614934370380),0605-103812-dlsabfv3,2.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 10, numOutputBytes -> 2916, numAddedFiles -> 1)",,Databricks-Runtime/15.4.x-photon-scala2.12
2,2025-06-05T11:06:22Z,599420620358905,contact@loonycorn.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 53949ff6-4ee1-402f-b711-ae6a56ac2981, epochId -> 1, statsOnLoad -> false)",,List(2326614934370380),0605-103812-dlsabfv3,1.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 10, numOutputBytes -> 3626, numAddedFiles -> 1)",,Databricks-Runtime/15.4.x-photon-scala2.12
1,2025-06-05T11:02:49Z,599420620358905,contact@loonycorn.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 53949ff6-4ee1-402f-b711-ae6a56ac2981, epochId -> 0, statsOnLoad -> false)",,List(2326614934370380),0605-103812-dlsabfv3,0.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 43, numOutputBytes -> 6862, numAddedFiles -> 2)",,Databricks-Runtime/15.4.x-photon-scala2.12
0,2025-06-05T11:02:42Z,599420620358905,contact@loonycorn.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 53949ff6-4ee1-402f-b711-ae6a56ac2981, epochId -> -1, statsOnLoad -> false)",,List(2326614934370380),0605-103812-dlsabfv3,,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-photon-scala2.12


In [0]:
%fs ls /delta/headphones

path,name,size,modificationTime
dbfs:/delta/headphones/_delta_log/,_delta_log/,0,1749121361000
dbfs:/delta/headphones/part-00000-099c6810-3d69-4a9f-b444-4152d5f32525.c000.snappy.parquet,part-00000-099c6810-3d69-4a9f-b444-4152d5f32525.c000.snappy.parquet,3626,1749121582000
dbfs:/delta/headphones/part-00000-33cbb84e-de53-47e8-8217-ef7d87264ba2.c000.snappy.parquet,part-00000-33cbb84e-de53-47e8-8217-ef7d87264ba2.c000.snappy.parquet,3538,1749121368000
dbfs:/delta/headphones/part-00000-a625d37b-152c-4116-8e8e-4d03279b04ce.c000.snappy.parquet,part-00000-a625d37b-152c-4116-8e8e-4d03279b04ce.c000.snappy.parquet,2916,1749121638000
dbfs:/delta/headphones/part-00001-d0dd3373-65dc-41b3-a260-e6f15d0221f4.c000.snappy.parquet,part-00001-d0dd3373-65dc-41b3-a260-e6f15d0221f4.c000.snappy.parquet,3324,1749121368000



#### Idempotent streaming

* Can delete data from the source and show that the data is still present in the Delta table
* Can re-add the deleted data and show that the data is not re-loaded into the table (autoloader streaming is idempotent)


#### Parsing extra columns

* Open up flipkart_headphones_extraCol.csv and show the extra "discount" column
* Upload that to the source folder
* The _rescued_data column has parsed this extra column


#### Rearranging columns

* Open up flipkart_headphones_rearrangedCol.csv and show the order of the columns is now different
* Upload that to the source folder
* Once this is ingested we should now have 40 records
* Scroll to the bottom and see that the records are correctly parsed