In [0]:
%run ./01_streamWC


In [0]:
class streamTestSuite():
    def __init__(self):
        self.base_data_dir =  "/FileStore/week01"

    def cleanTests(self):
        print(f"Starting Cleanup...", end='')
        spark.sql("drop table if exists word_count_table")
        dbutils.fs.rm("/user/hive/warehousr/word_count_table", True)

        dbutils.fs.rm(f"{self.base_data_dir}/chekpoint", True)
        dbutils.fs.rm(f"{self.base_data_dir}/data", True)

        dbutils.fs.mkdirs(f"{self.base_data_dir}/data")
        print("Done\n")

    def ingestData(self, itr):
        print(f"\tStarting Ingestion...", end='')
        dbutils.fs.cp(f"{self.base_data_dir}/text_0{itr}.txt", f"{self.base_data_dir}/data/")
        print("Done")

    def assertResult(self, expected_count):
        print(f"\tStarting validation...", end='')
        actual_count=spark.sql("select sum(count) from word_count_table").collect()[0][0]
        assert expected_count==actual_count, f"Test failed! actual count is {actual_count}"
        print(actual_count)
        print("Done")

    def runTests(self):
        import time
        sleepTime=30

        self.cleanTests()
        wc=streamWC()
        sQuery = wc.wordCount()

        print("Testing first iteration of batch word count...")
        self.ingestData(1)
        print(f"\tWaiting for {sleepTime} seconds...")
        time.sleep(sleepTime)
        self.assertResult(29010)
        print("First iteration of batch word count completed.\n")

        print("Testing second iteration of batch word count...")
        self.ingestData(2)
        print(f"\tWaiting for {sleepTime} seconds...")
        time.sleep(sleepTime)
        self.assertResult(175797)
        print("Second iteration of batch word count completed.\n")

        print("Testing third iteration of batch word count")
        self.ingestData(3)
        print(f"\tWaiting for {sleepTime} seconds...")
        time.sleep(sleepTime)
        self.assertResult(252462)
        print("Third iteration of batch word count completed.\n")

        sQuery.stop()

In [0]:
swcTS = streamTestSuite()
swcTS.runTests()

Starting Cleanup...Done

	Starting Word Count Stream...Done
Testing first iteration of batch word count...
	Starting Ingestion...Done
	Waiting for 30 seconds...
	Starting validation...29010
Done
First iteration of batch word count completed.

Testing second iteration of batch word count...
	Starting Ingestion...Done
	Waiting for 30 seconds...
	Starting validation...175797
Done
Second iteration of batch word count completed.

Testing third iteration of batch word count
	Starting Ingestion...Done
	Waiting for 30 seconds...
	Starting validation...252462
Done
Third iteration of batch word count completed.

