In [0]:
# File location and type
file_location = "/FileStore/tables/Restaurant_customer_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)
#df.show(10)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18
userID,latitude,longitude,smoker,drink_level,dress_preference,ambience,transport,marital_status,hijos,birth_year,interest,personality,religion,activity,color,weight,budget,height
U1001,22.139997,-100.978803,false,abstemious,informal,family,on foot,single,independent,1989,variety,thrifty-protector,none,student,black,69,medium,1.77
U1002,22.150087,-100.983325,false,abstemious,informal,family,public,single,independent,1990,technology,hunter-ostentatious,Catholic,student,red,40,low,1.87
U1003,22.119847,-100.946527,false,social drinker,formal,family,public,single,independent,1989,none,hard-worker,Catholic,student,blue,60,low,1.69
U1004,18.867,-99.183,false,abstemious,informal,family,public,single,independent,1940,variety,hard-worker,none,professional,green,44,medium,1.53
U1005,22.183477,-100.959891,false,abstemious,no preference,family,public,single,independent,1992,none,thrifty-protector,Catholic,student,black,65,medium,1.69
U1006,22.15,-100.983,true,social drinker,no preference,friends,car owner,single,independent,1989,variety,hard-worker,none,student,blue,75,medium,1.8
U1007,22.118464,-100.938256,false,casual drinker,informal,solitary,public,single,independent,1989,variety,thrifty-protector,Catholic,student,purple,60,low,1.59
U1008,22.122989,-100.923811,false,social drinker,formal,solitary,public,single,independent,1989,technology,hard-worker,Catholic,student,green,68,low,1.72
U1009,22.159427,-100.990448,false,abstemious,formal,family,on foot,single,kids,1991,variety,thrifty-protector,Catholic,student,green,75,medium,1.78


In [0]:
import os
os.makedirs("/mnt/streaming_data", exist_ok=True)


In [0]:
display(dbutils.fs.ls("/mnt"))


path,name,size,modificationTime
dbfs:/mnt/delta/,delta/,0,0


In [0]:
display(dbutils.fs.ls("/FileStore/tables/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/Restaurant_customer_data.csv,Restaurant_customer_data.csv,21338,1742362919000
dbfs:/FileStore/tables/streaming_data.csv,streaming_data.csv,64,1742375130000
dbfs:/FileStore/tables/streaming_data_1.csv,streaming_data_1.csv,64,1742375520000
dbfs:/FileStore/tables/streaming_data_1742370203.csv,streaming_data_1742370203.csv,30,1742370204000
dbfs:/FileStore/tables/streaming_data_1742370986.csv,streaming_data_1742370986.csv,122810,1742370987000
dbfs:/FileStore/tables/streaming_data_1742370988.csv,streaming_data_1742370988.csv,123920,1742370989000
dbfs:/FileStore/tables/streaming_data_1742370990.csv,streaming_data_1742370990.csv,123920,1742370991000
dbfs:/FileStore/tables/streaming_data_1742375179.csv,streaming_data_1742375179.csv,122810,1742375180000
dbfs:/FileStore/tables/streaming_data_1742375181.csv,streaming_data_1742375181.csv,123920,1742375182000
dbfs:/FileStore/tables/streaming_data_1742375183.csv,streaming_data_1742375183.csv,123920,1742375184000


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()

# Define the schema explicitly
schema = StructType([
    StructField("EMPLOYEE_ID", StringType(), True),
    StructField("FIRST_NAME", StringType(), True),
    StructField("SALARY", IntegerType(), True)
])

# Read streaming data from a folder
df = spark.readStream.format("csv") \
    .option("header", True) \
    .schema(schema)  \
    .load("/FileStore/tables/")

# Apply a transformation
df_transformed = df.select("EMPLOYEE_ID", "FIRST_NAME", "SALARY")

# Write the output to console with a trigger interval of 5 seconds
query = df_transformed.writeStream.format("console") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

query.awaitTermination()


In [0]:
import time

# Function to create a CSV file with streaming data
def create_streaming_file(file_num, rows=10000):
    csv_data = "EMPLOYEE_ID,FIRST_NAME,SALARY\n"  # CSV header
    for i in range(rows):
        csv_data += f"{file_num * 1000 + i},Employee_{i},{50000 + (i % 10000)}\n"

    file_path = f"/FileStore/tables/streaming_data_{int(time.time())}.csv"
    dbutils.fs.put(file_path, csv_data, True)
    print(f"📂 File {file_path} with {rows} rows added!")

# Generate multiple files to create a data spike
for i in range(3):  # 3 large files
    create_streaming_file(i, rows=5000)  # Each file has 5000 rows
    time.sleep(2)  # Small delay between file creations

print("🚀 Streaming spike triggered! Check console for new data.")


Wrote 122810 bytes.
📂 File /FileStore/tables/streaming_data_1742379352.csv with 5000 rows added!
Wrote 123920 bytes.
📂 File /FileStore/tables/streaming_data_1742379354.csv with 5000 rows added!
Wrote 123920 bytes.
📂 File /FileStore/tables/streaming_data_1742379356.csv with 5000 rows added!
🚀 Streaming spike triggered! Check console for new data.


In [0]:
query.stop()  # Stop the existing stream

# Restart the streaming query
query = df_transformed.writeStream.format("console") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()


##Using Triggers & Watermarks

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

spark = SparkSession.builder.appName("WatermarkingExample").getOrCreate()

# Define schema with timestamp column
schema = StructType([
    StructField("userID", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("event_time", TimestampType(), True),  # Event-time column for watermarking
    StructField("smoker", StringType(), True),
    StructField("drink_level", StringType(), True),
    StructField("weight", IntegerType(), True),
    StructField("height", IntegerType(), True),
])

# Read streaming data from a folder
df_streaming = spark.readStream.format("csv") \
    .option("header", "true") \
    .schema(schema)  \
    .load("/FileStore/tables/")

##Add Watermarking to Handle Late Data

In [0]:
from pyspark.sql.functions import window

df_with_watermark = df_streaming \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes"), "smoker") \
    .count()


##Write the Output with a Trigger Interval

In [0]:
import time

def create_streaming_file(file_num, rows=5):
    csv_data = "userID,latitude,longitude,event_time,smoker,drink_level,weight,height\n"
    for i in range(rows):
        csv_data += f"user_{file_num*10 + i},37.7,-122.4,2025-03-19 12:0{file_num}:{i},yes,occasional,70,175\n"

    file_path = f"/FileStore/tables/streaming_data_{int(time.time())}.csv"
    dbutils.fs.put(file_path, csv_data, True)
    print(f"📂 File {file_path} with {rows} rows added!")

# Generate 3 files every few seconds
for i in range(10):
    create_streaming_file(i, rows=5)
    time.sleep(5)  # Wait 5 seconds before creating next file


Wrote 370 bytes.
📂 File /FileStore/tables/streaming_data_1742380869.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380874.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380880.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380885.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380890.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380895.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380900.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380905.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380910.csv with 5 rows added!
Wrote 375 bytes.
📂 File /FileStore/tables/streaming_data_1742380916.csv with 5 rows added!


In [0]:
query = df_with_watermark.writeStream.format("console") \
    .outputMode("complete") \
    .trigger(processingTime="5 seconds") \
    .start()


In [0]:
print(f"Is DataFrame Streaming? {df_streaming.isStreaming}")


Is DataFrame Streaming? True
