In [None]:
%run ./03-streaming-word-count

In [None]:
import time

class StreamingWordCountTestSuite:
    def __init__(self, spark):
        self.base_dir = "/Volumes/workspace/default/spark_streaming"
        self.spark = spark
    
    def cleanTests(self):
        print("Performing cleanup tasks")
        self.spark.sql("drop table if exists word_counts")

        dbutils.fs.rm(f"{self.base_dir}/data/text", True)
        dbutils.fs.rm(f"{self.base_dir}/checkpoint", True)

        dbutils.fs.mkdirs(f"{self.base_dir}/data/text")
        print("Done!")
    
    def ingestData(self, itr):
        print("Starting ingestion ...")
        dbutils.fs.cp(f"{self.base_dir}/datasets/text/text_data_{itr}.txt", f"{self.base_dir}/data/text/")
        print("Done!")
    
    def assertResult(self, expected_count):
        actual_count = self.spark.sql("select sum(count) from word_counts_streaming where substr(word, 1, 1) == 's'").collect()[0][0]
        assert actual_count == expected_count, "Expected %d, found %d" % (expected_count, actual_count)

    def runStreamingBatch(self):
        print("Running a one-shot streaming batch")

        wc = StreamingWordCount(spark)

        # Run the streaming pipeline once
        query = wc.wordCount()

        # Wait until it finishes
        query.awaitTermination()
        print("Batch complete")
    
    def runTests(self):
        self.cleanTests()

        print("\n▶ Test Iteration 1")
        self.ingestData(1)
        self.runStreamingBatch()
        self.assertResult(25)

        print("\n▶ Test Iteration 2")
        self.ingestData(2)
        self.runStreamingBatch()
        self.assertResult(32)

        print("\nAll tests passed successfully!")

In [None]:
wc_test_suite = StreamingWordCountTestSuite(spark)
wc_test_suite.runTests()