### <font color='#4285f4'>Prerequisite to run this notebook</font>

In [None]:
#enable kafka API in the project
!(gcloud services enable managedkafka.googleapis.com --project "${GOOGLE_CLOUD_PROJECT}")

In [None]:
!pip install kafka-python

In [None]:
!pip install google-generativeai

# Initialisation

In [None]:
import pandas as pd
import json
import os
import time
import google.generativeai as genai

In [None]:
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
BQ_LOCATION = "us"
DATASET_ID = "cymbal_consumer_finance"
DATA_BUCKET_NAME = PROJECT_ID

kafka_cluster_name = "ti-kafka-cluster-01"
kafka_topic_name = "customer-events-topic-01"

#change <YOUR-LDAP> to your real ldap or pick any other name
dataflow_bucket = "ti-dataflow-staging-<YOUR-LDAP>"# should not have logical delete on
dataflow_service_account = "ti-dataflow-service-kafka-<YOUR-LDAP>@<YOUR-LDAP>.iam.gserviceaccount.com" # Needs Role: roles/managedkafka.client
#the account running this notebook needs to have "Service Account User" on this account.

CONNECTION_NAME = "vertex-ai"
connection = f"{PROJECT_ID}.{REGION}.{CONNECTION_NAME}"

#change this
network="default"
#network="colab-network"
subnet = "default"
#subnet = "colab-subnetwork"

#TODO: change this hardcoding.
life_events = ["New Child",
               "Graduation",
               "Relocation",
               "Retirement",
               "Home Purchase",
               "Medical Event",
               "Starting Business",
               "Marriage",
               "Divorce" ]


In [None]:
!(gcloud storage buckets create gs://{dataflow_bucket} \
    --project="{PROJECT_ID}")

In [None]:
!(gcloud storage buckets create gs://{DATA_BUCKET_NAME} \
    --project="{PROJECT_ID}")

In [None]:
!(gcloud compute networks subnets create "{subnet}" \
    --project="{PROJECT_ID}" \
    --region="{REGION}" \
    --network="{network}" \
    --range="10.10.0.0/20")

In [None]:
from google.cloud import bigquery
client = bigquery.Client()

# Functions

In [None]:
def PrettyPrintJson(json_string):
  json_object = json.loads(json_string)
  json_formatted_str = json.dumps(json_object, indent=2)
  print(json_formatted_str)
  return json.dumps(json_object)

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

### IAM Functions

In [None]:
def setProjectLevelIamPolicy(accountWithPrefix, role):
  """Sets the Project Level IAM policy."""

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
  project_id = f"{PROJECT_ID}"

  url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:getIamPolicy"

  request_body = { }
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"setProjectLevelIamPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      if item["role"] == role:
        members = item["members"]
        for member in members:
          if member == accountWithPrefix:
            print("Permissions exist")
            return

  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions
  if "bindings" in json_result:
    bindings = json_result["bindings"]
  else:
    bindings = []

  new_permission = {
      "role": role,
      "members": [ accountWithPrefix ]
      }

  bindings.append(new_permission)

  # https://cloud.google.com/resource-manager/reference/rest/v1/projects/setIamPolicy
  url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:setIamPolicy"

  request_body = { "policy" : {
      "bindings" : bindings
      }
  }

  print(f"Permission bindings: {bindings}")

  json_result = restAPIHelper(url, "POST", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"Project Level IAM Permissions set for {accountWithPrefix} {role}")

In [None]:
def createVertexAIConnection(vertex_ai_connection_name):
  """Creates a Vertex AI connection."""

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list
  project_id = f"{PROJECT_ID}"
  bigquery_location = f"{BQ_LOCATION}"
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createVertexAIConnection (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "connections" in json_result:
    for item in json_result["connections"]:
      print(f"BigLake Connection: {item['name']}")
      # "projects/756740881369/locations/us/connections/vertex-ai-notebook-connection"
      # NOTE: We cannot test the complete name since it contains the project number and not id
      if item["name"].endswith(f"/locations/{bigquery_location}/connections/{vertex_ai_connection_name}"):
        print("Connection already exists")
        serviceAccountId = item["cloudResource"]["serviceAccountId"]
        return serviceAccountId

  # Create the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create
  print("Creating Vertex AI Connection")

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={vertex_ai_connection_name}"

  request_body = {
      "friendlyName": "{vertex_ai_connection_name}",
      "description": "Vertex AI Colab Notebooks Connection for BQ",
      "cloudResource": {}
  }

  json_result = restAPIHelper(url, "POST", request_body)

  serviceAccountId = json_result["cloudResource"]["serviceAccountId"]
  print("Vertex AI Connection created: ", serviceAccountId)
  return serviceAccountId

### BQ Functions

In [None]:
def setBigQueryDatasetPolicy(account, role):
  """Sets the BigQuery Dataset IAM policy."""

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/get

  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{PROJECT_ID}/datasets/{DATASET_ID}"

  json_result = restAPIHelper(url, "GET", None)
  print(f"setBigQueryDatasetPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "access" in json_result:
    for item in json_result["access"]:
      if "userByEmail" in item:
        if item["userByEmail"] == account and item["role"] == role:
          print("Permissions exist")
          return


  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions
  if "access" in json_result:
    access = json_result["access"]
  else:
    access = []

  new_permission = {
      "role": role,
      "userByEmail": account
      }

  access.append(new_permission)

  # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch
  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{PROJECT_ID}/datasets/{DATASET_ID}"

  request_body = {
      "access" : access
      }

  print(f"Permission bindings: {access}")

  json_result = restAPIHelper(url, "PATCH", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"BigQuery Dataset IAM Permissions set for {account} {role}")


In [None]:
def GetTableSchema(dataset_name, table_name):
  import io

  dataset_ref = client.dataset(dataset_name, project=PROJECT_ID)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  f = io.StringIO("")
  client.schema_to_json(table.schema, f)
  return f.getvalue()

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### Kafka Functions

In [None]:
def createApacheKafkaForBigQueryCluster():
  """Creates a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery already exists")
        return f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}"

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create
  print("Creating Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters?clusterId={kafka_cluster_name}"

  # Larger Apache Kafka Cluster
  # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal
  # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers

  request_body = {
      "capacityConfig": {
        "vcpuCount": "3",
        "memoryBytes": "3221225472"
      },
      "gcpConfig": {
          "accessConfig": {
              "networkConfigs": {
                  "subnet": f"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{subnet}"
                  }
            }
        }
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  done = json_result["done"]
  print("Apache Kafka for BigQuery created: ", name)
  return f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}"

In [None]:
def waitForApacheKafkaForBigQueryCluster(operation):
  """
  Waits for an Apache Kafka For BigQuery Cluster to be Created.

  opertion:
    projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855
  """

  url = f"https://managedkafka.googleapis.com/v1/{operation}"
  max_retries = 100
  attempt = 0

  while True:
    # Gather existing connections
    json_result = restAPIHelper(url, "GET", None)
    print(f"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

    # Test to see if connection exists, if so return
    if "state" in json_result:
      if json_result["state"] == "ACTIVE":
        print("Apache Kafka for BigQuery Cluster created")
        return None

    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("Apache Kafka for BigQuery Cluster not created")
    time.sleep(30)


In [None]:
def createApacheKafkaForBigQueryTopic():
  """Creates a Apache Kafka For BigQuery Topic."""

  # First find the topic if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}/topics"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "topics" in json_result:
    for item in json_result["topics"]:
      print(f"Apache Kafka for BigQuery Topic: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}":
        print("Apache Kafka for BigQuery Topic already exists")
        return None


  # Create Apache Kafka For BigQuery Topic
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/create
  print("Creating Apache Kafka For BigQuery Topic")

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}/topics?topicId={kafka_topic_name}"

  # partition_count 32 -> for larger cluster
  request_body = {
      "partition_count"    : 6,
      "replication_factor" : 3
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  print("Apache Kafka for BigQuery Topic created: ", name)
  return None

In [None]:
def deleteApacheKafkaForBigQueryCluster():
  """Deletes a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")
  found = False

  # Test to see if cluster, if so then delete
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery  exists")
        found = True
        break

  if found == False:
    print("Apache Kafka for BigQuery does not exist")
    return None

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/delete
  print("Deleting Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}"

  json_result = restAPIHelper(url, "DELETE", request_body={})

  print("Apache Kafka for BigQuery deleted")

#### Kafka Auth

In [None]:
import json
import base64
import datetime
import http.server
import google.auth
import google.auth.crypt
import google.auth.jwt
import google.auth.transport.urllib3
import urllib3
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.errors import KafkaError

class MyTokenProvider(AbstractTokenProvider):

    _credentials, _project = google.auth.default(
        scopes=['https://www.googleapis.com/auth/cloud-platform']
    )
    _http_client = urllib3.PoolManager()

    def valid_credentials(self):
      if not self._credentials.valid:
        self._credentials.refresh(google.auth.transport.urllib3.Request(self._http_client))
      return self._credentials

    _HEADER = json.dumps(dict(typ='JWT', alg='GOOG_OAUTH2_TOKEN'))

    def get_jwt(self, creds):
      return json.dumps(
          dict(
              exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(),
              iss='Google',
              iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
              sub=creds.service_account_email,
          )
      )

    def b64_encode(self, source):
      return (
          base64.urlsafe_b64encode(source.encode('utf-8'))
          .decode('utf-8')
          .rstrip('=')
      )

    def get_kafka_access_token(self, creds):
      return '.'.join(
          [self.b64_encode(self._HEADER), self.b64_encode(self.get_jwt(creds)), self.b64_encode(creds.token)]
      )

    def build_message(self):
      creds = self.valid_credentials()
      expiry_seconds = (
          creds.expiry.replace(tzinfo=datetime.timezone.utc) - datetime.datetime.now(datetime.timezone.utc)
      ).total_seconds()
      return self.get_kafka_access_token(creds)

    def __init__(self, **config):
        pass

    def token(self):
        message = self.build_message()
        return message

# Copy Data

In [None]:
# Copy our data (CSV files).  We want the files in our local bucket with local location.
source_path = "gs://data-analytics-golden-demo/cymbal-consumer-finance/*"
dest_path = f"gs://{DATA_BUCKET_NAME}/cymbal-consumer-finance/"
print(f"Copying data from {source_path} to {dest_path}")
print("This may take a few minutes...")
!gsutil -m -q cp -r {source_path} {dest_path}
print("Copy [data] is complete")

# Create Table Scripts

In [None]:
sql = f"""
CREATE SCHEMA IF NOT EXISTS cymbal_consumer_finance OPTIONS(location = 'us');
"""

RunQuery(sql)

In [None]:
bigquery_streaming_destination_table = "kafka_events"
sql = f"""
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.{bigquery_streaming_destination_table}`
(
    customer_id STRING NOT NULL OPTIONS(description="customer id"),
    transaction_or_search STRING NOT NULL OPTIONS(description="a user transaction or a search term")
    )
"""

RunQuery(sql)

In [None]:
bigquery_predicted_event_table = "predicted_life_event"
sql = f"""
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.{bigquery_predicted_event_table}`
(
    customer_id STRING NOT NULL OPTIONS(description="customer id"),
    predicted_life_event STRING NOT NULL OPTIONS(description="ML predicted life event based on search terms")
)
"""

RunQuery(sql)

In [None]:
bigquery_ml_training_table = "transactions_to_life_events"
sql = f"""
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.{bigquery_ml_training_table}`
(
    transaction_or_search   STRING NOT NULL OPTIONS(description="a user transaction or a search term"),
    life_event STRING NOT NULL OPTIONS(description="possible life event related to the transaction or search term"),
)
"""

RunQuery(sql)

In [None]:
bigquery_customers_table = "customers"
sql = f"""LOAD DATA OVERWRITE `{PROJECT_ID}.{DATASET_ID}.{bigquery_customers_table}`
(
  customer_id STRING,
  first_name STRING,
  last_name STRING,
  date_of_birth DATE,
  email STRING,
  phone_number STRING,
  creation_date DATE,
  life_event STRING
)
FROM FILES (format = 'CSV', skip_leading_rows = 1, uris = ['gs://{DATA_BUCKET_NAME}/cymbal-consumer-finance/ccf_csv_tables_customers.csv']);
"""
RunQuery(sql)

# Run Gemini to generate Syntectic Data

To generate an API key, please follow the steps [here](https://support.google.com/googleapi/answer/6158862?hl=en), then enter the key in the box below.



In [None]:
os.environ["GEMINI_API_KEY"] = "<API_KEY>"

In [None]:
genai.configure(api_key=os.environ['GEMINI_API_KEY'])

model = genai.GenerativeModel(model_name='gemini-1.5-pro', generation_config=genai.GenerationConfig(
        temperature=0.1
    ))

In [None]:
count = 100

schema = GetTableSchema(DATASET_ID, bigquery_ml_training_table)

In [None]:
#TODO: change this hardcoding.
life_events = ["New Child",
               "Graduation",
               "Relocation",
               "Retirement",
               "Home Purchase",
               "Medical Event",
               "Starting Business",
               "Marriage",
               "Divorce" ]

In [None]:
prompt=f"""
You are a database engineer and need to generate data for a table for the below schema.
- The schema is for a Google Cloud BigQuery Table.
- The table name is "{PROJECT_ID}.{DATASET_ID}.{bigquery_ml_training_table}".
- Read the description of each field for valid values.
- Do not preface the response with any special characters or 'sql'.
- Generate {count} insert statements for this table.
- Valid values for life events are: {life_events}
- Generate values for insert statement such that each life_event has equal number of rows.
- For each life event create unique transaction or search term related to that event.

Example 1: INSERT INTO `my-dataset.my-dataset.my-table` (field_1, field_2) VALUES (1, 'Sample'),(2, 'Sample');
Example 2: INSERT INTO `my-dataset.my-dataset.my-table` (field_1, field_2) VALUES (1, 'Data'),(2, 'Data'),(3, 'Data');

Schema: {schema}
"""

llm_valid_execution = False
loop_count = 0
while loop_count < 7:
  try:
    #sql = LLM(prompt, False, max_output_tokens=1024, temperature=1, top_p=1, top_k=40)
    response_json = model.generate_content(prompt)
    sql = response_json.text

    print("---------------------------------")
    print("sql: ", sql)
    print("---------------------------------")
    loop_count += 1
    llm_valid_execution = RunQuery(sql)
    if not llm_valid_execution:
        print("Error running generated SQL")
        break;
  except Exception as error:
    print("An error occurred:", error)

# Create a BQ Gemini Model to help with prediction

In [None]:
# Set the required permissions on the external connection's service principal
vertexAIServiceAccountId = createVertexAIConnection("vertex-ai")

vertexAIServiceAccountId

# To call GENERATE TEXT
setProjectLevelIamPolicy(f"serviceAccount:{vertexAIServiceAccountId}","roles/aiplatform.user")

In [None]:
sql = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{DATASET_ID}.gemini_model`
REMOTE WITH CONNECTION `projects/{PROJECT_ID}/locations/{BQ_LOCATION}/connections/{CONNECTION_NAME}`
OPTIONS (ENDPOINT = 'gemini-1.5-flash-002');
"""
RunQuery(sql)

# Create synthetic data for Kafka

In [None]:
#change the project name to {PROJECT_ID}

sql = f"""SELECT DISTINCT customer_id FROM `{PROJECT_ID}.{DATASET_ID}.{bigquery_customers_table}` LIMIT 3000"""
customer_ids_df = RunQuery(sql)

#get a few transaction/searches we created before

sql = f"""SELECT DISTINCT transaction_or_search FROM `{PROJECT_ID}.{DATASET_ID}.{bigquery_ml_training_table}` LIMIT 1000"""
searches_df = RunQuery(sql)

In [None]:
import random

# Convert DataFrames to lists for easier random selection
customer_ids = customer_ids_df['customer_id'].tolist()  # Access the column by name
searches = searches_df['transaction_or_search'].tolist() # Access the column by name

num_records = 10

combined_records = []

for _ in range(num_records):
    customer_id = random.choice(customer_ids)
    search = random.choice(searches)
    record = {
        "customer_id": customer_id,
        "transaction_or_search": search
        }
    combined_records.append(record)

# Create Kafka Infra

In [None]:
# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList

opertion = createApacheKafkaForBigQueryCluster()

if opertion is not None:
  waitForApacheKafkaForBigQueryCluster(opertion)

In [None]:
# create kafka topic

createApacheKafkaForBigQueryTopic()

## Send data to Kafka

In [None]:
# send data to the newly created kafka topic
from kafka import KafkaProducer
from kafka import KafkaConsumer


mtp = MyTokenProvider()
bootstrap_server = f"bootstrap.{kafka_cluster_name}.{REGION}.managedkafka.{PROJECT_ID}.cloud.goog:9092"

# Create a Kafka producer
producer = KafkaProducer(
        client_id="one-file-writer",
        bootstrap_servers=bootstrap_server,
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider = mtp,
        api_version=(1,4,7),
        api_version_auto_timeout_ms	= 20000,
        request_timeout_ms = 60000,
        value_serializer=lambda v: json.dumps(v).encode('utf-8') )

In [None]:
# loop through combined_records and send to Kafka

for record in combined_records:
  #json_data = json.dumps(record)
  # Send the JSON data to Kafka
  producer.send(kafka_topic_name, record)

producer.flush()
producer.close()
print(f"Sent all messages to Kafka")

## Test reading Kafka data

In [None]:
#test reading kafka messages

# Create a Kafka consumer
consumer = KafkaConsumer( kafka_topic_name,  # Replace with the topic you're consuming from
    bootstrap_servers=bootstrap_server,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=mtp,
    api_version=(1, 4, 7),  # Must match the producer's API version or be compatible
    api_version_auto_timeout_ms=20000,
    request_timeout_ms=60000,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),  # Important: Deserialize JSON
    auto_offset_reset='earliest', # Or 'latest' depending on your needs. Important!
    enable_auto_commit=True, # or False, depending on your needs. Important!
    consumer_timeout_ms=10000 # How long to wait for messages before timing out.
)

try:
    for message in consumer:
        # Process the received message
        print(f"Received message: {message.value}")
        # ... your message processing logic here ...

except KeyboardInterrupt:
    print("Consumer stopped by user.")

finally:
    consumer.close()
    print("Consumer closed.")


# Stream kafka events to BQ via Dataflow

WARNING: This will create a new job everytime this is run. The notebook will only stop the lastest job, so please check the DataFlow UI to Cancel any additional jobs.


In [None]:
def createDataflowJobApacheKafkaToBigQuery(jobName):
  """Creates a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/jobs?location={REGION}"

  # Gather existing job
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']}")
      if item["name"] == jobName:
        print(f"DataFlow job already exists with date of {item['currentState']}.  Try a new name.")
        return None

  # Create DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch
  print("Creating DataFlow Job from Flex Template")

  url = f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/locations/{REGION}/flexTemplates:launch"

  # Continuous Queries needs useStorageWriteApiAtLeastOnce = True
  # https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery#optional-parameters
  #numStorageWriteApiStreams : Specifies the number of write streams, this parameter must be set. Default is 0.
  #storageWriteApiTriggeringFrequencySec : Specifies the triggering frequency in seconds, this parameter must be set. Default is 5 seconds.
  #useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.

  request_body = {
    "launch_parameter": {
        "jobName": jobName,
        "containerSpecGcsPath": "gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex",
        "parameters": {
            "readBootstrapServerAndTopic": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}",
            "persistKafkaKey": "false",
            "writeMode": "SINGLE_TABLE_NAME",
            "numStorageWriteApiStreams": "2",
            "useStorageWriteApiAtLeastOnce": "true",
            "storageWriteApiTriggeringFrequencySec": "5",
            "enableCommitOffsets": "false",
            "kafkaReadOffset": "latest",
            "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
            "messageFormat": "JSON",
            "useBigQueryDLQ": "false",
            "stagingLocation": f"gs://{dataflow_bucket}/staging",
            "autoscalingAlgorithm": "NONE",
            "serviceAccount": dataflow_service_account,
            "usePublicIps": "false",
            "labels": "{\"goog-dataflow-provided-template-type\":\"flex\",\"goog-dataflow-provided-template-name\":\"kafka_to_bigquery_flex\"}",
            "outputTableSpec": f"{PROJECT_ID}:{DATASET_ID}.{bigquery_streaming_destination_table}"
        },
        "environment": {
            "numWorkers": 2,
            "tempLocation": f"gs://{dataflow_bucket}/tmp",
            "subnetwork": f"regions/{REGION}/subnetworks/{subnet}",
            "enableStreamingEngine": True,
            "additionalExperiments": [
                "enable_streaming_engine"
            ],
            "additionalUserLabels": {}
        }
    }
}

  json_result = restAPIHelper(url, "POST", request_body)

  job = json_result["job"]
  print("Dataflow job for kakfa->BQ created: ", job)
  return job

In [None]:
def stopDataflowJobApacheKafkaToBigQuery(jobName):
  """Stops a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/jobs?location={REGION}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName and item["currentState"] == "JOB_STATE_RUNNING":
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Stop DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update
  print("Stopping DataFlow Job ")

  url=f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/locations/{REGION}/jobs/{jobId}"
  print(url)

  request_body = { "requestedState" : "JOB_STATE_CANCELLED" }

  json_result = restAPIHelper(url, "PUT", request_body)

  #job = json_result["job"]
  print("DataFlow Job Stopped")
  return

In [None]:
def waitForDataFlowJobToStart(jobName):
  """Waits for job to turn to running"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/jobs?location={REGION}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName:
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Gets the job status
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get
  print("Getting DataFlow Job ")
  url=f"https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/locations/{REGION}/jobs/{jobId}"
  print(url)

  max_retries = 100
  attempt = 0

  while True:
    # Get Job
    json_result = restAPIHelper(url, "GET", None)

    # Test to see if connection exists, if so return
    if "currentState" in json_result:
      print(f"waitForDataFlowJobToStart (GET) currentState: {json_result['currentState']}")
      if json_result["currentState"] == "JOB_STATE_RUNNING":
        print("DataFlow Job is now running")
        return None
    else:
      print(f"waitForDataFlowJobToStart (GET) json_result: {json_result}")


    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("DataFlow Job not created")
    time.sleep(30)

In [None]:
#give the dataflow service account data owner access on the dataset in BQ

setBigQueryDatasetPolicy(f"{dataflow_service_account}", "OWNER")

In [None]:
# The job can take a few minutes to start.  Click the link to see the progress:
# https://console.cloud.google.com/dataflow/jobs

jobName= f"kafka-stream-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
createDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
waitForDataFlowJobToStart(jobName)

# Use continuous queries to detect life event with BQ ML

In [None]:
# Create a continuous query reservation and assignment

user_input = input("Do you want to start BigQuery Reservations? This will START billing and will continue until you remove these in the Clean Up code (Y/n)?")
if user_input == "Y":
  sql = f"""CREATE RESERVATION `{PROJECT_ID}.region-US.continuous-query-reservation`
            OPTIONS (edition = "enterprise",
                     slot_capacity = 50);
  """
  RunQuery(sql)

  sql = f"""CREATE ASSIGNMENT `{PROJECT_ID}.region-US.continuous-query-reservation.continuous-query-reservation-assignment`
            OPTIONS(assignee = "projects/{PROJECT_ID}",
                    job_type = "CONTINUOUS");
  """
  RunQuery(sql)

**Run each of the below queries**

* Copy the SQL to a BigQuery SQL Window
* Under the More menu, select Continuous Query
* Under the Query settings, under Continuous query, select kafka-continuous-query for the service account
* Run the Query (it will take a minute to start)

In [None]:
# continuous query for predicting life_event on the stremed data

sql = f"""
INSERT INTO `{PROJECT_ID}.{DATASET_ID}.predicted_life_event` (


with streaming_data AS (
SELECT * FROM `{PROJECT_ID}.{DATASET_ID}.kafka_events`
),
prediction AS (
  SELECT *
    FROM ML.GENERATE_TEXT(MODEL`{PROJECT_ID}.{DATASET_ID}.gemini_model`,
          (SELECT customer_id,transaction_or_search,
                             CONCAT("Generate a life event from the give set of keywords that a customer used for searching. for example, a keyword such ",
                                    "'best sofa for living room' may suggest the customer bought a new house. so the life event is 'New House' ",
                                    "The kewords for which you need to predict the life event is: ", transaction_or_search,
                                    "Only possible life events are: New Child,Graduation,Relocation,",
                                    "Retirement, Home Purchase,Medical Event,Starting Business,Marriage,Divorce.",
                                    "ouput the life event as the only output, strip all other result sentences") AS prompt
            FROM streaming_data),
          STRUCT(.8 AS temperature, .8 AS top_p)
          )
)

select customer_id,
STRING(ml_generate_text_result.candidates[0].content.parts[0].text) as predicted_life_event
from prediction

)


"""

# Clean Up

In [None]:
# Note if you do not know your job id, or overwrote the value, click here to open and manually Cancel the job
# https://console.cloud.google.com/dataflow/jobs

user_input = input(f"Do you want to delete your DataFlow Job {jobName} (Y/n)?")
if user_input == "Y":
  stopDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
user_input = input("Do you want to delete your Apache Kafka for BigQuery (Y/n)?")
if user_input == "Y":
  deleteApacheKafkaForBigQueryCluster()

In [None]:

user_input = input("Do you want to delete your BigQuery Reservations. This will STOP billing! (Y/n)?")
if user_input == "Y":
  sql = f"DROP ASSIGNMENT `{PROJECT_ID}.region-{REGION}.continuous-query-reservation.continuous-query-reservation-assignment`;"
  RunQuery(sql)
  sql = f"DROP RESERVATION `{PROJECT_ID}.region-{REGION}.continuous-query-reservation`;"
  RunQuery(sql)