In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import greatest

spark = SparkSession.builder \
    .appName("FileStreamToMemory") \
    .getOrCreate()

schema = StructType([
    StructField("Date", StringType(), True),
    StructField("one_hour", DoubleType(), True),
    StructField("two_hour", DoubleType(), True),
    StructField("three_hour", DoubleType(), True),
    StructField("four_hour", DoubleType(), True),
    StructField("five_hour", DoubleType(), True),
    StructField("six_hour", DoubleType(), True),
    StructField("seven_hour", DoubleType(), True),
    StructField("eight_hour", DoubleType(), True),
    StructField("nine_hour", DoubleType(), True),
    StructField("ten_hour", DoubleType(), True),
    StructField("eleven_hour", DoubleType(), True),
    StructField("twelve_hour", DoubleType(), True),
    StructField("thirteen_hour", DoubleType(), True),
    StructField("fourteen_hour", DoubleType(), True),
    StructField("fifteen_hour", DoubleType(), True),
    StructField("sixteen_hour", DoubleType(), True),
    StructField("seventeen_hour", DoubleType(), True),
    StructField("eighteen_hour", DoubleType(), True),
    StructField("nineteen_hour", DoubleType(), True),
    StructField("twenty_hour", DoubleType(), True),
    StructField("twenty_one_hour", DoubleType(), True),
    StructField("twenty_two_hour", DoubleType(), True),
    StructField("twenty_three_hour", DoubleType(), True),
    StructField("twenty_four_hour", DoubleType(), True),
    StructField("AVG_Temp", DoubleType(), True)
])


In [2]:

file_stream_df = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv("spark-warehouse")  


In [3]:
peak_load_df = file_stream_df \
    .withColumn("PeakLoad", 
        greatest(
            "one_hour", "two_hour", "three_hour", "four_hour", "five_hour",
            "six_hour", "seven_hour", "eight_hour", "nine_hour", "ten_hour",
            "eleven_hour", "twelve_hour", "thirteen_hour", "fourteen_hour",
            "fifteen_hour", "sixteen_hour", "seventeen_hour", "eighteen_hour",
            "nineteen_hour", "twenty_hour", "twenty_one_hour", "twenty_two_hour",
            "twenty_three_hour", "twenty_four_hour"
        )
    ) \
    .groupBy("Date") \
    .agg({"PeakLoad": "max"}) \
    .withColumnRenamed("max(PeakLoad)", "PeakLoad") \
    .orderBy("Date")  

query = peak_load_df \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("peak_load_data") \
    .start()


In [None]:
from IPython.display import display, clear_output
import time
while True:
    clear_output(wait=True)
    peak_load_df = spark.table("peak_load_data")
    display(peak_load_df.show(truncate=False))


+----------+--------+
|Date      |PeakLoad|
+----------+--------+
|01/01/2010|17190.0 |
|01/01/2011|19500.0 |
|01/01/2012|20660.0 |
|01/01/2013|21625.0 |
|01/01/2014|22325.0 |
|01/02/2010|18080.0 |
|01/02/2011|16250.0 |
|01/02/2012|20430.0 |
|01/02/2013|19800.0 |
|01/02/2014|22050.0 |
|01/03/2010|18025.0 |
|01/03/2011|19200.0 |
|01/03/2012|20775.0 |
|01/03/2013|20000.0 |
|01/03/2014|22403.0 |
|01/04/2010|18660.0 |
|01/04/2011|18350.0 |
|01/04/2012|20700.0 |
|01/04/2013|22750.0 |
|01/04/2014|22495.0 |
+----------+--------+
only showing top 20 rows



None

In [None]:
query.stop()