### Importing required libraries:

In [1]:
# Regular Expression
import re
import string
import os

import nltk
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
from nltk import word_tokenize, sent_tokenize
from sklearn.feature_extraction.text import CountVectorizer

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
import logging

# For saving and loading machine learning model
import pickle
import warnings
warnings.filterwarnings("ignore")

### Credential: 

In [2]:
# For Credential:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="D:\classifier.json"

### Apache Beam Pipeline:

In [3]:
stopWordLst = stopwords.words('english')
stemmer = SnowballStemmer('english')

def text_process(text):
    """
    input: raw text, 
    Applying preprocess and transformations including: 
    lowercase,
    keeping only alphabetical chars,
    removing stop_words, 
    taking root by stem of word
    """
    orginalText = text
    text = text.lower()
    text = re.sub('[^a-z]', ' ', text)
    tokenLst = word_tokenize(text)
    lst1 = [stemmer.stem(token) for token in tokenLst if token not in stopWordLst]
    text = ' '.join(lst1)
    dic1 = {'original': orginalText,
           'processed': text}
    return dic1

def finalResult(dic1):
    """
    In the output file, only the prediction for each message will be saved.
    For that matter, the required value from the element dictionary will be extracted.
    """
    res = dic1['result']
    return res  
    
def download_model(bucket_name=None, source_blob_name=None, project=None, destination_file_name=None):
    storage_client = storage.Client(project)
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    
class PredictSpam(beam.DoFn):
    
    def __init__(self, project=None, bucket_name=None, model_path=None, destination_name=None):       
        """
        To deploy the pipeline, Google Cloud Platform is used.
        In this initialization, the information of project name, bucket name, tarined model
        are passed to the class variables.
        """
        self._model = None
        self._project = 'text-classifier-89064107'
        self._bucket_name = 'ted_test_data'
        self._model_path = 'model.sav'
        self._destination_name = 'model.sav'
    
    def setup(self):
        """This function will download the trained model from google clouds"""      
        
        # Loading model from local disk:
        #path = os.path.join(os.path.dirname(os.getcwd()),'test_case\model\model.sav')
        #self._model = pickle.load(open(path, 'rb'))
        
        # Loading model from Google Cloud:
        logging.info("Naive Bayes model {}".format(self._model_path))
        download_model(bucket_name=self._bucket_name,
                      source_blob_name=self._model_path,
                      project=self._project,
                      destination_file_name=self._destination_name)
        
        # unpickle the trained model:
        self._model = pickle.load(open(self._destination_name, 'rb'))
        
    def process(self, element):
        """This function will apply the trained model on a given message.
        The result will be saved in the element dictionary.
        The element dictionary includes three keys:
        1- original_text,
        2- processed_text,
        3- prediction
        However, for printing purposes, only the prediction will be exported.
        """        
        predicted = self._model.predict([element['processed']])[0]
        if(predicted == 0):
            element['result'] = 'Normal'
        else:
            element['result'] = 'Spam'
            
        res = element['result']
        return [res] 
        
        #return [element]

inputfile = 'gs://ted_test_data/df_test.csv'
outputfile = 'gs://ted_test_data/result'
projectName = 'text-classifier-89064107'
bucketName = 'ted_test_data'
predicted_table_schema = ('original_text:STRING, processed_text:STRING, result:STRING')
tableId = 'text-classifier-89064107:classifier_test_case.prediction'
datasetId='text-classifier-89064107:classifier_test_case'

def run(argv=None):
    
    p = beam.Pipeline(options=PipelineOptions())
    (p
     | 'Reading from google cloud' >> beam.io.ReadFromText(inputfile)
     | 'Applying text preprocess' >> beam.Map(text_process)
     | 'Applying model on real time data' >> beam.ParDo(PredictSpam())
     | 'Writing to google cloud' >> beam.io.WriteToText(outputfile,
                                             file_name_suffix='.csv',
                                             header='result')
    )
    result = p.run()
    result.wait_until_finish()


"""
# The following pipeline will write the results within a predefined dataset in google cloud:
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(datapath)
      | 'Process' >> beam.Map(text_process)
      | 'Prdict spam/ham label' >> beam.ParDo(PredictSklearn())
      | 'Print output' >> beam.Map(print_row)
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
      | 'Write results' >> beam.io.WriteToBigQuery(table=tableId,
                                                   schema = predicted_table_schema,
                                                   dataset=datasetId,
                                                   project=projectName,
                                                   batch_size=int(100),
                                                   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                   write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                   method="STREAMING_INSERTS")
  )
  
# To delete and insert data we can use:
# beam.io.BigQueryDisposition.WRITE_TRUNCATE
"""
    
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token


INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.33.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x000002BB6F1041F0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_Writing-to-google-cloud-Write-WriteImpl-DoOnce-Impulse_14)+(ref_AppliedPTransform_Writing-to-google-cloud-Write-WriteImpl-DoOnce-FlatMap-lambda-at-core-py-2968-_15))+(ref_AppliedPTransform_Writing-to-google-cloud-Write-WriteImpl-DoOnce-Map-decode-_17))+(ref_AppliedPTransform_Writing-to-google-cloud-Write-WriteImpl-InitializeWrite_18))+(ref_PCollection_PCollection_8/Write))+(ref_PCollection_PCollection_9/Write)
INFO:apache_beam.runners.portab