In [0]:
class batch_WordCount():
    def __init__(self):
        self.base_data_dir = "/FileStore/tables"
    
    def getRawData(self):
        from pyspark.sql.functions import explode, split
        lines = (spark.read
                        .format("text")
                        .option("lineSep", ".")
                        .load(f"{self.base_data_dir}/data/text")
        )
        return lines.select(explode(split(lines.value, " ")).alias("word"))
    
    def getQualityData(self, rawDF):
        from pyspark.sql.functions import trim, lower
        return ( rawDF.select(lower(trim(rawDF.word)).alias("word"))
                        .where("word is not null")
                        .where("word rlike '[a-z]'")
        )
    
    def overwriteWordCount(self, wordCountDF):
        ( wordCountDF.write
                    .format("delta")
                    .mode("overwrite")
                    .saveAsTable("word_count_table")
        )
    
    def getWordCount(self, qualityDF):
        return qualityDF.groupBy("word").count()

    def wordCount(self):
        print(f"\tExecuting Word Count...", end='')
        rawDF = self.getRawData()
        qualityDF = self.getQualityData(rawDF)
        resultDF = self.getWordCount(qualityDF)
        self.overwriteWordCount(resultDF)
        print("Done")


In [0]:
class stream_WordCount():
    def __init__(self):
        self.base_data_dir = "/FileStore/tables"
    
    def getRawData(self):
        from pyspark.sql.functions import explode, split
        lines = (spark.readStream
                        .format("text")
                        .option("lineSep", ".")
                        .load(f"{self.base_data_dir}/data/text")
        )
        return lines.select(explode(split(lines.value, " ")).alias("word"))
    
    def getQualityData(self, rawDF):
        from pyspark.sql.functions import trim, lower
        return ( rawDF.select(lower(trim(rawDF.word)).alias("word"))
                        .where("word is not null")
                        .where("word rlike '[a-z]'")
        )
    
    def overwriteWordCount(self, wordCountDF):
        return ( wordCountDF.writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/word_count")
                    .outputMode("complete")
                    .toTable("word_count_table")
        )
    
    def getWordCount(self, qualityDF):
        return qualityDF.groupBy("word").count()
    

    def wordCount(self):
        print(f"\tStarting Word Count Streaming...", end='')
        rawDF = self.getRawData()
        qualityDF = self.getQualityData(rawDF)
        resultDF = self.getWordCount(qualityDF)
        sQuery = self.overwriteWordCount(resultDF)
        print("Done")
        return sQuery