In [0]:
%fs ls /FileStore/shared_uploads/hassom20042019@gmail.com/Occupancy/

path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/hassom20042019@gmail.com/Occupancy/Occupancy-1.csv,Occupancy-1.csv,1375350,1733180149000
dbfs:/FileStore/shared_uploads/hassom20042019@gmail.com/Occupancy/Occupancy-2.csv,Occupancy-2.csv,1375350,1733180866000
dbfs:/FileStore/shared_uploads/hassom20042019@gmail.com/Occupancy/Occupancy.csv,Occupancy.csv,1375350,1733179996000


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
spark = SparkSession.builder.appName("OccStreaming").getOrCreate()

inputPath = "/FileStore/shared_uploads/hassom20042019@gmail.com/Occupancy/"

# Define the schema to speed up processing
csvSchema = StructType([
    StructField("date", TimestampType(), True),
    StructField("Temperature", StringType(), True),
    StructField("Humidity", StringType(), True),
    StructField("Light", IntegerType(), True),
    StructField("CO2", IntegerType(), True),
    StructField("HumidityRatio", IntegerType(), True),
    StructField("Occupancy", IntegerType(), True)
])

streamingInputDF = (
  spark
    .readStream
    .schema(csvSchema)               # Set the schema of the csv data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)

# Create a new column "IsOccupied" based on the "Occupancy" column
streamingOccDF = (
  streamingInputDF
        .groupBy(
      streamingInputDF.Occupancy,
      window(streamingInputDF.date, "1 day"))
    .count()
)

In [0]:
# Start the streaming job
query = (
  streamingOccDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("occupancyq")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [0]:
%sql select Occupancy, date_format(window.end, "MMM-dd HH:mm") as time, count from occupancyq 

Occupancy,time,count
0,Feb-05 00:00,2481
0,Feb-17 00:00,2667
0,Feb-09 00:00,4320
1,Feb-17 00:00,1653
0,Feb-15 00:00,4320
0,Feb-13 00:00,3588
0,Feb-19 00:00,1653
1,Feb-13 00:00,732
1,Feb-18 00:00,1611
1,Feb-11 00:00,162
