In [0]:
class batchWC():
    def __init__(self):
        self.base_data_dir = "/Volumes/workspace/default"

    def getRawData(self):
        from pyspark.sql.functions import explode, split
        lines = (spark.read
                    .format("text")
                    .option("lineSep", ".")
                    .load(f"{self.base_data_dir}/data/text")
                )
        return lines.select(explode(split(lines.value, " ")).alias("word"))
    
    def getQualityData(self, rawDF):
        from pyspark.sql.functions import trim, lower
        return ( rawDF.select(lower(trim(rawDF.word)).alias("word"))
                        .where("word is not null")
                        .where("word rlike '[a-z]'")
                )
        
    def getWordCount(self, qualityDF):
        return qualityDF.groupBy("word").count()
    
    def overwriteWordCount(self, wordCountDF):
        ( wordCountDF.write
                    .format("delta")
                    .mode("overwrite")
                    .saveAsTable("word_count_table")
        )
    
    def wordCount(self):
        print(f"\tExecuting Word Count...", end='')
        rawDF = self.getRawData()
        qualityDF = self.getQualityData(rawDF)
        resultDF = self.getWordCount(qualityDF)
        self.overwriteWordCount(resultDF)
        print("Done")

In [0]:
class batchWCTestSuite():
    def __init__(self):
        self.base_data_dir = "/Volumes/workspace/default"

    def cleanTests(self):
        print(f"Starting Cleanup...", end='')
        spark.sql("drop table if exists word_count_table")
        dbutils.fs.rm("/Volumes/workspace/default/word_count_table", True)

        dbutils.fs.rm(f"{self.base_data_dir}/checkpoint", True)
        dbutils.fs.rm(f"{self.base_data_dir}/data", True)

        dbutils.fs.mkdirs(f"{self.base_data_dir}/data")
        print("Done\n")

    def ingestData(self, itr):
        print(f"\tStarting Ingestion...", end='')
        dbutils.fs.cp(f"{self.base_data_dir}/datasets/text/text_data_{itr}.txt", f"{self.base_data_dir}/data/text/text_data_{itr}.txt")
        print("Done")

    def assertResult(self, expected_count):
        print(f"\tStarting validation...", end='')
        actual_count = spark.sql("select sum(count) from word_count_table where substr(word, 1, 1) == 's'").collect()[0][0]
        assert expected_count == actual_count, f"Test failed! actual count is {actual_count}"
        print("Done")

    def runTests(self):
        self.cleanTests()
        wc = batchWC()

        print("Testing first iteration of batch word count...") 
        self.ingestData(1)
        wc.wordCount()
        self.assertResult(25)
        print("First iteration of batch word count completed.\n")

        print("Testing second iteration of batch word count...") 
        self.ingestData(2)
        wc.wordCount()
        self.assertResult(32)
        print("Second iteration of batch word count completed.\n") 

        print("Testing third iteration of batch word count...") 
        self.ingestData(3)
        wc.wordCount()
        self.assertResult(37)
        print("Third iteration of batch word count completed.\n")
    

In [0]:
bwcTS = batchWCTestSuite()
bwcTS.runTests()

Starting Cleanup...Done

Testing first iteration of batch word count...
	Starting Ingestion...Done
	Executing Word Count...Done
	Starting validation...Done
First iteration of batch word count completed.

Testing second iteration of batch word count...
	Starting Ingestion...Done
	Executing Word Count...Done
	Starting validation...Done
Second iteration of batch word count completed.

Testing third iteration of batch word count...
	Starting Ingestion...Done
	Executing Word Count...Done
	Starting validation...Done
Third iteration of batch word count completed.



In [0]:
spark.sql("show tables").show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
| default|word_count_table|      false|
+--------+----------------+-----------+



In [0]:
spark.sql("select count(*) from word_count_table").show()

+--------+
|count(*)|
+--------+
|     146|
+--------+



In [0]:
spark.sql("select * from word_count_table").show(146)

+-----------------+-----+
|             word|count|
+-----------------+-----+
|            about|    1|
|          without|    2|
|           reason|    1|
|           static|    1|
|                a|    8|
|      computation|    3|
|       event-time|    1|
|              use|    1|
|             logs|    1|
|               it|    1|
|            spark|    4|
|  stream-to-batch|    1|
|        continues|    1|
|            fast,|    1|
|           arrive|    1|
|                r|    1|
|              can|    3|
|            final|    1|
|       guarantees|    3|
|               to|    6|
|               in|    4|
|            would|    1|
|             will|    2|
|           system|    1|
|          ensures|    1|
|           engine|    4|
|           python|    1|
|               or|    1|
|    checkpointing|    1|
|             take|    1|
|         scalable|    1|
|           joins,|    1|
|         finally,|    1|
|     exactly-once|    3|
|           stream|    2|
|dataset/dat