In [36]:
import google.cloud.aiplatform as aip

In [37]:
project = "vertex-ai-382806"
location = "us-central1"
aip.init(project=project, location=location)

In [38]:
endpoint_id = "7747939582565416960"
endpoint=aip.Endpoint(endpoint_name=endpoint_id)

In [39]:
endpoint.list_models()[0].display_name

'churn'

In [25]:
import apache_beam as beam
from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.options.pipeline_options import PipelineOptions
from collections.abc import Iterable
import json
import ast


from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

In [None]:
#model_handler = VertexAIModelHandlerJSON(endpoint_id=endpoint_id, project=project, location=location) #.with_preprocess_fn(preprocess_image).with_preprocess_fn(download_image)


In [None]:
example_json = b"""{
"user_pseudo_id": "1BE4F29852B390FC94D2A4E7382CCEBD",
"country": "Taiwan",
"operating_system": "ANDROID",
"language": "zh-tw",
"cnt_user_engagement": 71,
"cnt_level_start_quickplay": 28,
"cnt_level_end_quickplay": 18,
"cnt_level_complete_quickplay": 13,
"cnt_level_reset_quickplay": 0,
"cnt_post_score": 13,
"cnt_spend_virtual_currency": 0,
"cnt_ad_reward": 0,
"cnt_challenge_a_friend": 0,
"cnt_completed_5_levels": 0,
"cnt_use_extra_steps": 0,
"month": 9,
"julianday": 256,
"dayofweek": 5
}
"""

In [None]:
instances_initial = {
"user_pseudo_id": "1BE4F29852B390FC94D2A4E7382CCEBD",
"country": "Taiwan",
"operating_system": "ANDROID",
"language": "zh-tw",
"cnt_user_engagement": 71,
"cnt_level_start_quickplay": 28,
"cnt_level_end_quickplay": 18,
"cnt_level_complete_quickplay": 13,
"cnt_level_reset_quickplay": 0,
"cnt_post_score": 13,
"cnt_spend_virtual_currency": 0,
"cnt_ad_reward": 0,
"cnt_challenge_a_friend": 0,
"cnt_completed_5_levels": 0,
"cnt_use_extra_steps": 0,
"month": 9,
"julianday": 256,
"dayofweek": 5
}

In [26]:
class Preprocess(beam.DoFn):
    def __init__(self):
        from google.protobuf import json_format
        from google.protobuf.struct_pb2 import Value
        
    def process(self, message):
        from google.protobuf import json_format
        from google.protobuf.struct_pb2 import Value
        #message_list = [ast.literal_eval(message.decode("utf-8"))]
        message_list = [json.loads(message.decode("utf-8"))]
        return [json_format.ParseDict(instance_dict, Value()) for instance_dict in message_list]
        
class Postprocess(beam.DoFn):
    def __init__(self):
        pass
    
    def process(self, element):
        print("New Message:")
        input_example = element.example
        prediction_vals = element.inference
        
        print("input:")
        print(input_example.struct_value["user_pseudo_id"])#.get("user_pseudo_id"))
        print("predictions:")
        print(prediction_vals["predicted_churned"][0])
        print(max(prediction_vals["churned_probs"]))
        print("\n")
        
        return [{"user_pseudo_id": input_example.struct_value["user_pseudo_id"], "churn": prediction_vals["predicted_churned"][0], "probability": max(prediction_vals["churned_probs"])}]

In [40]:
model_handler = VertexAIModelHandlerJSON(endpoint_id=endpoint_id, project=project, location=location) #.with_preprocess_fn(preprocess_image).with_preprocess_fn(download_image)
subscription = "projects/vertex-ai-382806/subscriptions/churn_prediction_topic-sub"
bq_location = "vertex-ai-382806.cloud_summit_demo.predictions"
# options=pipeline_options

pipeline_options = PipelineOptions(streaming=True)

with beam.Pipeline(options=pipeline_options) as p:
        elements = (  
            p  | "Read PubSub"  >> beam.io.ReadFromPubSub(
                subscription=subscription
            )#.with_output_types(bytes)
           #| "print1" >> beam.Map(print)
           #| "Create manual input" >> beam.Create(instances) 
           | "Preprocess" >> beam.ParDo(Preprocess())
           # | "print2" >> beam.Map(print)
            | "Run Vertex Inference" >> RunInference(model_handler)
            | "Process Output" >> beam.ParDo(Postprocess())
            | "Write to BigQuery" >> beam.io.WriteToBigQuery(table=bq_location)
            #| "print3" >> beam.Map(print)
        )



New Message:
input:
1BE4F29852B390FC94D2A4E7382CCEBD
predictions:
1
0.870379335564385




KeyboardInterrupt: 

# Run Beam pipeline as a Dataflow streaming job

In [41]:
from apache_beam.runners import DataflowRunner


In [42]:
from apache_beam.options.pipeline_options import GoogleCloudOptions, WorkerOptions, SetupOptions

# Set up Apache Beam pipeline options.
options = PipelineOptions(streaming = True)

# Set the project to the default project in your current Google Cloud
# environment.
options.view_as(GoogleCloudOptions).project = "vertex-ai-382806"


In [43]:

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://cloudsummit2024-demo-bucket/dataflow'

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

#options.view_as(GoogleCloudOptions).streaming = True

In [44]:
options.view_as(WorkerOptions).network = "default"
options.view_as(WorkerOptions).subnetwork = "regions/us-central1/subnetworks/uscentral1"

In [45]:
options.view_as(SetupOptions).requirements_file = "requirements.txt"


In [46]:
options.display_data()



{'streaming': True,
 'project': 'vertex-ai-382806',
 'staging_location': 'gs://cloudsummit2024-demo-bucket/dataflow/staging',
 'temp_location': 'gs://cloudsummit2024-demo-bucket/dataflow/temp',
 'region': 'us-central1',
 'network': 'default',
 'subnetwork': 'regions/us-central1/subnetworks/uscentral1',
 'requirements_file': 'requirements.txt'}

In [47]:
p = beam.Pipeline(options=options)
lines = p  | "Read PubSub"  >> beam.io.ReadFromPubSub(
                subscription=subscription
            ).with_output_types(bytes) | "Preprocess" >> beam.ParDo(Preprocess()) | "Run Vertex Inference" >> RunInference(model_handler) | "Process Output" >> beam.ParDo(Postprocess()) | "Write to BigQuery" >> beam.io.WriteToBigQuery(table=bq_location)



In [48]:
runner = DataflowRunner()
runner.run_pipeline(p, options=options)



<DataflowPipelineResult <Job
 clientRequestId: '20240606133355431765-9532'
 createTime: '2024-06-06T13:33:56.361899Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-06-06_06_33_55-1477578557408872782'
 location: 'us-central1'
 name: 'beamapp-jupyter-0606133355-430931-0izdvjra'
 projectId: 'vertex-ai-382806'
 stageStates: []
 startTime: '2024-06-06T13:33:56.361899Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7f8616f26710>