In [1]:
#importing libraries 
import os 
import cloudpickle as cp 
import pandas as pd
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import datetime
import uuid


#parameters
KEY_FILE       = './key/gtdp-mlops-dev-70d84af4ad0f.json'
MODEL_PATH     = './Adeo_IrisShowcase_Model.pkl'
PROJECT        = 'gtdp-mlops-dev'
REGION         = 'eu'
TEMP_LOCATION  = 'gs://gtdp_mlops_dev_mlinput/temp'

OUTPUT_TABLE   = 'iris_output'
OUTPUT_DATASET = 'showcase'

OUTPUT_SCHEMA  = {
                    'fields': 
                    [
                        {'name':'job_id'        ,'type':'STRING'   , 'mode': 'REQUIRED'},
                        {'name':'input_data'    ,'type':'STRING'   , 'mode': 'REQUIRED'},
                        {'name':'transform_data','type':'STRING'   , 'mode': 'NULLABLE'},
                        {'name':'output_data'   ,'type':'STRING'   , 'mode': 'NULLABLE'},
                        {'name':'tec_dat_cre'   ,'type':'TIMESTAMP', 'mode': 'REQUIRED'},
                    ]
                 }

#to_debug
OUTPUT_FILE = './irisoutput_v2.csv'

INPUT_QUERY = 'SELECT * FROM showcase.iris_input LIMIT 100'

#settings
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = KEY_FILE


#loading the model from a serialized file
mySerializableObject = cp.load(open(MODEL_PATH,'rb'))

#class to execute the inferences in a parallel mode
class Predict(beam.DoFn):

  def process(self, input, job_id):

    #transformed = list(map(float,list(input.split(',')))) #any transformation needed
    #print(transformed)

    #transformed = [input[k] for k in input]
    #inference = mySerializableObject.models['LogisticRegression']['obj'].predict([transformed]) 
    inference = mySerializableObject.predict(input) 
    
    yield self.convertToOutput(job_id, input, {}, inference) # converts the output to the desired format


  def convertToOutput(self, job_id, input, transformed, inference):
   #'Sepal.Length,Sepal.Width,Petal.Length,Petal.Width
    output = {
      'job_id'          : job_id,
      'input_data'      : json.dumps(input, default=str),
      'transform_data'  : json.dumps(transformed,default=str), #json.dumps(transformed.to_dict('records'), default=str),
      'output_data'     : json.dumps(inference, default=str),
      'tec_dat_cre'     : datetime.datetime.now().timestamp()
    }
    return output

#creating a pipeline
beam_options = {}
p = beam.Pipeline(options=PipelineOptions(beam_options, project=PROJECT, region=REGION, temp_location=TEMP_LOCATION))

cancorders = (
    p
  #  | 'ReadLocal'       >> beam.io.ReadFromText('./inferences.csv')
    | 'ReadTable'    >> beam.io.ReadFromBigQuery(query=INPUT_QUERY)
    | 'Predict'      >> beam.ParDo(Predict(),uuid.uuid4().hex)
    | 'WriteLocal'   >> beam.io.WriteToText(OUTPUT_FILE) #useful to debug
#    | 'WriteToBQ'    >> beam.io.WriteToBigQuery(OUTPUT_TABLE, OUTPUT_DATASET, PROJECT, OUTPUT_SCHEMA, beam.io.BigQueryDisposition.CREATE_IF_NEEDED, beam.io.BigQueryDisposition.WRITE_APPEND)
)


p.run()






<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f727dd04040>

In [3]:
s = {'Sepal_Length': 6.8274866290096785, 'Sepal_Width': 4.203959685052126, 'Petal_Length': 2.932320571471628, 'Petal_Width': 0.34953103330249935}

In [4]:
[s[k] for k in s]

[6.8274866290096785, 4.203959685052126, 2.932320571471628, 0.34953103330249935]