In [1]:
#cmontufar@wisc.edu
#mroytman@wisc.edu

In [2]:
#Part 3:

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

schema = "station STRING, date DATE, degrees DOUBLE, raining INTEGER"
df = (df.select(from_json(col("value").cast("string"), schema).alias("value")).select("value.*"))

In [8]:
df.isStreaming

True

In [9]:
from pyspark.sql.functions import *

counts_df = df.groupBy("station").agg(
    min("date").alias("start"),
    max("date").alias("end"),
    count("degrees").alias("measurements"),
    avg("degrees").alias("avg"),
    max("degrees").alias("max")
)

s = counts_df.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/29 04:03:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2b923f39-0e13-421f-85d6-4f41c38827d3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/29 04:03:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+------------------+
|station|     start|       end|measurements|               avg|               max|
+-------+----------+----------+------------+------------------+------------------+
|      F|2000-01-01|2000-07-05|         187|48.947904439104626| 91.64854721269778|
|      K|2000-01-01|2000-07-05|         187| 38.09047603886158| 77.81071006033444|
|      I|2000-01-01|2000-07-05|         187|60.674067778352054|104.57891274754704|
|      N|2000-01-01|2000-07-05|         187| 43.88774160321495| 89.89182400725942|
|      E|2000-01-01|2000-07-05|         187| 53.13786560506357| 91.36728474057894|
|      J|2000-01-01|2000-07-05|         187| 64.82885978951383|108.70011908893918|
|      A|2000-01-01|2000-07-05|         187| 53.37385214312269| 94.80152869347043|
|      H|2000-01-01|2000-07-05|         187| 43.98722763613677| 89.459589

23/04/29 04:04:01 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 5848 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+------------------+
|station|     start|       end|measurements|               avg|               max|
+-------+----------+----------+------------+------------------+------------------+
|      K|2000-01-01|2000-07-08|         190|38.622090250739866| 77.81071006033444|
|      F|2000-01-01|2000-07-08|         190| 49.41408457359274| 91.64854721269778|
|      I|2000-01-01|2000-07-08|         190|61.390705179362534|110.13804397065772|
|      N|2000-01-01|2000-07-08|         190| 44.19091826319916| 89.89182400725942|
|      E|2000-01-01|2000-07-08|         190| 53.30842833349492| 91.36728474057894|
|      J|2000-01-01|2000-07-08|         190| 65.43032717874223|110.28619025734498|
|      A|2000-01-01|2000-07-08|         190| 53.94121755821584| 94.80152869347043|
|      H|2000-01-01|2000-07-08|         190| 44.58296140485102| 89.459589

23/04/29 04:04:26 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@20668251 is aborting.
23/04/29 04:04:26 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@20668251 aborted.
23/04/29 04:04:26 WARN TaskSetManager: Lost task 4.0 in stage 20.0 (TID 164) (dafffbac9d94 executor driver): TaskKilled (Stage cancelled)
23/04/29 04:04:26 WARN TaskSetManager: Lost task 5.0 in stage 20.0 (TID 165) (dafffbac9d94 executor driver): TaskKilled (Stage cancelled)


In [10]:
from pyspark.sql.functions import date_add

today = df.select("station", "date", (col("raining").cast("int")).alias("raining"))
yesterday = df.withColumn("date", date_add("date", -1)).select(
    "station", "date", col("degrees").alias("sub1degrees"), col("raining").cast("int").alias("sub1raining"))

two_days_ago = df.withColumn("date", date_add("date", -2)).select(
    "station", "date", col("degrees").alias("sub2degrees"), col("raining").cast("int").alias("sub2raining"))

features = yesterday.join(two_days_ago, ["station", "date"]).withColumn("month", month("date"))

s2 = (
    today
    .join(features, ["station", "date"])
    .repartition(1)
    .writeStream
    .format("parquet")
    .option("checkpointLocation", "checkpoint")
    .option("path", "parquet")
    .trigger(processingTime="1 minute")
    .start())

s2.awaitTermination(61) #We set it to over a minute to assure at least one is created
s2.stop()

23/04/29 04:04:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
#Part 4:

In [12]:
data = spark.read.parquet("parquet/")

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [14]:
assembler = VectorAssembler(inputCols=["month", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining"], outputCol="features")
data = assembler.transform(data)

train_data, test_data = data.randomSplit([0.75, 0.25], seed=42)

dt_classifier = DecisionTreeClassifier(labelCol="raining", featuresCol="features")
dt_model = dt_classifier.fit(train_data)

In [15]:
print(dt_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_b579d834c827, depth=5, numNodes=31, numClasses=2, numFeatures=5
  If (feature 2 <= 0.5)
   If (feature 0 <= 2.5)
    Predict: 0.0
   Else (feature 0 > 2.5)
    If (feature 0 <= 5.5)
     If (feature 0 <= 3.5)
      Predict: 0.0
     Else (feature 0 > 3.5)
      If (feature 3 <= 75.08611467875605)
       Predict: 1.0
      Else (feature 3 > 75.08611467875605)
       Predict: 0.0
    Else (feature 0 > 5.5)
     If (feature 3 <= 45.84724427828422)
      Predict: 1.0
     Else (feature 3 > 45.84724427828422)
      Predict: 0.0
  Else (feature 2 > 0.5)
   If (feature 0 <= 7.5)
    If (feature 1 <= 61.190297186384086)
     If (feature 0 <= 2.5)
      If (feature 3 <= 34.55790222682037)
       Predict: 1.0
      Else (feature 3 > 34.55790222682037)
       Predict: 0.0
     Else (feature 0 > 2.5)
      Predict: 1.0
    Else (feature 1 > 61.190297186384086)
     Predict: 1.0
   Else (feature 0 > 7.5)
    If (feature 4 <= 0.5)
     If (

In [16]:
from pyspark.sql.functions import mean
predictions = dt_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="raining", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

rain_avg = data.agg(mean(col("raining").cast("int")).alias("raining")).collect()[0][0]
print("Model Accuracy:", accuracy)
print("Raining Avg:", rain_avg)


Model Accuracy: 0.7557077625570776
Raining Avg: 0.3254421768707483


In [17]:
from pyspark.sql import Row

def prediction_batching(predictions, batch_size=8, sleep_time=5):
    total_rows = predictions.count()
    num_batches = (total_rows + batch_size - 1) // batch_size
    schema = predictions.select("station", "date", "prediction").schema

    for batch_num in range(3):
        start_index = batch_num * batch_size
        end_index = __builtins__.min(start_index + batch_size, total_rows)
        print("-" * 40)
        print(f"Batch: {batch_num + 1}")
        print("-" * 40)
        rows = predictions.select("station", "date", "prediction").take(end_index)[-batch_size:]
        predictions_batch = predictions.sql_ctx.createDataFrame(rows, schema)
        predictions_batch.show()
        time.sleep(sleep_time)

In [18]:
df.writeStream.foreachBatch(prediction_batching(predictions, 8, 5)).trigger(processingTime="5 seconds")

----------------------------------------
Batch: 1
----------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      A|2000-01-03|       0.0|
|      A|2000-01-07|       0.0|
|      A|2000-01-09|       0.0|
|      A|2000-01-10|       0.0|
|      A|2000-01-14|       0.0|
|      A|2000-01-20|       1.0|
|      A|2000-01-24|       0.0|
|      A|2000-01-30|       0.0|
+-------+----------+----------+

----------------------------------------
Batch: 2
----------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      A|2000-01-31|       0.0|
|      A|2000-02-02|       0.0|
|      A|2000-02-04|       1.0|
|      A|2000-02-05|       1.0|
|      A|2000-02-12|       0.0|
|      A|2000-02-15|       0.0|
|      A|2000-02-16|       0.0|
|      A|2000-02-17|       0.0|
+-------+----------+----------+

----------------------------------------
Batch: 

<pyspark.sql.streaming.DataStreamWriter at 0x7f02838341f0>