#### Building the simple Word count application 
* Initially we will build the batch application with word count problem.
* Then we will convert the batch application to stream application.

In [0]:
from pyspark.sql import functions as F
class WordCount():

    def __init__(self):
        self.base_data_dir = "/FileStore/ram"
        print("Started the process.")
    
    def getRawData(self):
        print("Bringing the raw data.")
        return ( spark.read.format('text')
            .load(f"{base_data_dir}/data/text/")
        )

    def explodeData(self,df):
        print("Exploding the data.")
        return df.select(F.explode(F.split(df.value,' ')).alias('word'))
    
    def qualityAnalysis(self,df):
        print("Running Quality.")
        return ( df.select(F.lower(F.trim(df.word)).alias('word'))
            .filter('word is not null')
            .filter("word rlike '[a-z]'")
        )

    def aggregareCount(self,df):
        print("Performing Aggregation.")
        return (
            df.groupBy('word').count()
        )

    def writeTable(self,df):
        print("Writing the data into table.")
        return (
                df.
                write.
                format('delta').
                mode('overwrite')
                .saveAsTable('word_count_table')
        )
        
    
    def startProcess(self):
        raw_data = self.getRawData()
        exploded_df = self.explodeData(raw_data)
        quality_df = self.qualityAnalysis(exploded_df)
        count_df = self.aggregareCount(quality_df)
        self.writeTable(count_df)
        print("Done with the load")
    

    

In [0]:
WordCount().startProcess()