### <font color='#4285f4'>Overview</font>

This notebook empowers marketers to create hyper-personalized emails with unique marketing text and images generated for each customer. By analyzing current trends in Germany related to Automotive Industry and Car Insurance; and leveraging Gemini's advanced capabilities, we identify relevant trends and tailor messages to customers' interests, even translating content and providing transparent explanations for all AI-generated elements.

Process Flow:
1. Used GIGA app shown by gTech colleagues to create insights using these 3 keywords: car insurance, insurance coverage, electric vehicles [English language and Germany Location]
2. Used Gemini to extract relevant keywords from these Insights 
3. Generate a marketing message based upon the customer's profile. Determine their current policy and any specifics that would help tailor the message, combine it with keywords and timeliness
4. Pass the marketing message and ask Gemini to create a prompt for Imagen3
    * a. Gemini will author our Imagen3 prompt for us.
5. Generate the image based upon the image prompt Gemini created
6. Pass the marketing message, user profile and generated image to Gemini Vision.
    * a. Ask Gemini to verify that the image satisfies the original request.
7 Ask Gemini to translate the marketing text into another language.
    * a. Ask Gemini to verify the translation to make sure it did what we asked
8. Create the HTML email message.

Notes:
* We could also create campaigns based on customer segmentation in place 
* Combine it with Keyword Insights to create timely campaigns for relevant Audiences

Cost:
* Very Small: A few calls to Gemini, Imagen3 and BigQuery
* Medium: Remember to stop your Colab Enterprise Notebook Runtime

In [None]:
! pip install -r requirements.txt -q

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs
import sys

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

from google.cloud import bigquery
from google import genai
from google.cloud import storage

Add location, storage bucket, project and user details below

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "TO_DO_DEVELOPER"
location = "TO_DO_DEVELOPER"
storage_account = "TO_DO_DEVELOPER"
project_id = "TO_DO_DEVELOPER"
dataset_id = "TO_DO_DEVELOPER"
user = "TO_DO_DEVELOPER"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

gemini_languages = ["Arabic (ar)",  "Bengali (bn)",  "Bulgarian (bg)",  "Chinese simplified (zh)",  "Chinese traditional (zh)",
  "Croatian (hr)",  "Czech (cs)",  "Danish (da)",  "Dutch (nl)",  "Estonian (et)",  "Finnish (fi)",  "French (fr)",
  "German (de)",  "Greek (el)",  "Hebrew (iw)",  "Hindi (hi)",  "Hungarian (hu)",  "Indonesian (id)",  "Italian (it)",  "Japanese (ja)",
  "Korean (ko)",  "Latvian (lv)",  "Lithuanian (lt)",  "Norwegian (no)",  "Polish (pl)",  "Portuguese (pt)",  "Romanian (ro)",  "Russian (ru)",
  "Serbian (sr)",  "Slovak (sk)",  "Slovenian (sl)",  "Spanish (es)",  "Swahili (sw)",  "Swedish (sv)",  "Thai (th)",  "Turkish (tr)",  "Ukrainian (uk)",
  "Vietnamese (vi)"]

print(f"project_id = {project_id}")
print(f"dataset_id = {dataset_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RetryCondition (for retrying LLM calls)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### Imagen3 Image Generation

In [None]:
def ImageGen(prompt):
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  model_version = "imagen-3.0-generate-001" # imagen-3.0-fast-generate-001
  #model_version = "imagen-3.0-generate-preview-0611" # Preview Access Model

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation
  # url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/imagegeneration:predict"
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/{model_version}:predict"

  payload = {
    "instances": [
      {
        "prompt": prompt
      }
    ],
    "parameters": {
      "sampleCount": 1,
      "personGeneration" : "dont_allow"  # change to allow_adult for people generation
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    response_json = json.loads(response.content)
    print(f"Imagen3 response_json: {response_json}")

    if "blocked" in response_json:
      print(f"Blocked: {response_json['blocked']}")

    if "predictions" in response_json:
      image_data = response_json["predictions"][0]["bytesBase64Encoded"]
      image_data = base64.b64decode(image_data)
      filename= str(uuid.uuid4()) + ".png"
      with open(filename, "wb") as f:
        f.write(image_data)
      print(f"Image generated OK.")
      return filename
    else:
      raise RuntimeError(f"No predictions in response: {response.content}")
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)

#### Gemini LLM (Pro 1.0 , Pro 1.5)

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-1.5-pro-001", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models
  # model = "gemini-1.5-pro-001"
  # model = "gemini-pro" # This does support topK (but the function is more generic)
  # model = "gemini-1.0-pro" # This does not support response_schema

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-pro" or model == "gemini-1.0-pro" or model == "gemini-1.0-pro-vision-001":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM_VerifyImage(prompt, imageBase64, model = "gemini-1.5-pro-001", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models
  # model = "gemini-1.5-pro-001"
  # model = "gemini-pro" # This does support topK (but the function is more generic)
  # model = "gemini-1.0-pro" # This does not support response_schema

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-pro" or model == "gemini-1.0-pro" or model == "gemini-1.0-pro-vision-001":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": [
          { "text": prompt },
          { "inlineData": {  "mimeType": "image/png", "data": f"{imageBase64}" } }
        ]
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

#### Helper Functions

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

In [None]:
def convert_png_to_base64(image_path):
  image = cv2.imread(image_path)

  # Convert the image to a base64 string.
  _, buffer = cv2.imencode('.png', image)
  base64_string = base64.b64encode(buffer).decode('utf-8')

  return base64_string

In [None]:
def PrettyPrintJson(json_string):
  json_object = json.loads(json_string)
  json_formatted_str = json.dumps(json_object, indent=2)
  return json_formatted_str

In [None]:
def GetNextPrimaryKey(fully_qualified_table_name, field_name):
  from google.cloud import bigquery
  client = bigquery.Client()
  sql = f"""
  SELECT IFNULL(MAX({field_name}),0) AS result
    FROM `{fully_qualified_table_name}`
  """
  # print(sql)
  df_result = client.query(sql).to_dataframe()
  # display(df_result)
  return df_result['result'].iloc[0] + 1

In [None]:
# This was generated by GenAI

def copy_file_to_gcs(local_file_path, bucket_name, destination_blob_name):
  """Copies a file from a local drive to a GCS bucket.

  Args:
      local_file_path: The full path to the local file.
      bucket_name: The name of the GCS bucket to upload to.
      destination_blob_name: The desired name of the uploaded file in the bucket.

  Returns:
      None
  """

  import os
  from google.cloud import storage

  # Ensure the file exists locally
  if not os.path.exists(local_file_path):
      raise FileNotFoundError(f"Local file '{local_file_path}' not found.")

  # Create a storage client
  storage_client = storage.Client()

  # Get a reference to the bucket
  bucket = storage_client.bucket(bucket_name)

  # Create a blob object with the desired destination path
  blob = bucket.blob(destination_blob_name)

  # Upload the file from the local filesystem
  content_type = ""
  if local_file_path.endswith(".html"):
    content_type = "text/html; charset=utf-8"

  if local_file_path.endswith(".json"):
    content_type = "application/json; charset=utf-8"

  if content_type == "":
    blob.upload_from_filename(local_file_path)
  else:
    blob.upload_from_filename(local_file_path, content_type = content_type)

  print(f"File '{local_file_path}' uploaded to GCS bucket '{bucket_name}' as '{destination_blob_name}.  Content-Type: {content_type}'.")

### <font color='#4285f4'>Create BigQuery Tables</font>

In [None]:
customer_hyper_personalised_email_sql = f"""
CREATE TABLE IF NOT EXISTS `{dataset_id}.customer_hyper_personalised_email`
(
    customer_id                                    STRING  NOT NULL OPTIONS(description="Primary key."),
    email_date                                     DATE    NOT NULL OPTIONS(description="The date of the email."),

    llm_marketing_prompt                           STRING  OPTIONS(description="The prompt to generate the marketing email text."),
    llm_marketing_prompt_response_json             JSON    OPTIONS(description="The response from the llm marketing prompt in json."),
    llm_marketing_prompt_response_text             STRING  OPTIONS(description="The response from the llm marketing prompt in text."),

    llm_orginial_image_prompt                      STRING  OPTIONS(description="The prompt to generate the original image text."),
    llm_orginial_image_prompt_response_json        JSON    OPTIONS(description="The response from the llm original prompt in json."),
    llm_orginial_image_prompt_response_text        STRING  OPTIONS(description="The response from the llm original prompt in text."),

    llm_improved_image_prompt                      STRING  OPTIONS(description="The prompt to generate the improved image text."),
    -- The improved prompt will be passed to Imagen3
    --llm_improved_image_prompt_response_json      JSON    OPTIONS(description="The response from the llm improved prompt in json."),
    --llm_improved_image_prompt_response_text      STRING  OPTIONS(description="The response from the llm improved prompt in text."),

    llm_verify_image_prompt                        STRING  OPTIONS(description="The prompt to verify the generated image."),
    llm_verify_image_response_json                 JSON    OPTIONS(description="The response from verify the generated image in json."),
    llm_verify_image_text                          STRING  OPTIONS(description="The response from verify the generated image in text."),

    llm_translation_language_prompt                STRING  OPTIONS(description="The prompt to generate the secondary lanagage text."),
    llm_translation_language_prompt_response_json  JSON    OPTIONS(description="The response from the llm secondary lanagage prompt in json."),
    llm_translation_language_prompt_response_text  STRING  OPTIONS(description="The response from the llm secondary lanagage prompt in text."),

    llm_validate_translation_prompt                STRING  OPTIONS(description="The prompt to generate the vadiation of the translation text."),
    llm_validate_translation_prompt_response_json  JSON    OPTIONS(description="The response from the llm vadiation of the translation prompt in json."),
    llm_validate_translation_prompt_response_text  STRING  OPTIONS(description="The response from the llm vadiation of the translation prompt in text."),

    image_gcs_filename                             STRING  OPTIONS(description="The GCS path for the marketing campaign image."),
    image_http_url                                 STRING  OPTIONS(description="The HTTP path for the marketing campaign image."),
    image_generated                                BOOLEAN OPTIONS(description="Has the image been generated and saved to GCS."),
    image_verified                                 BOOLEAN OPTIONS(description="Did the image pass verification."),

    html_gcs_filename                              STRING  OPTIONS(description="The GCS path for the marketing campaign HTML file."),
    html_http_url                                  STRING  OPTIONS(description="The HTTP path for the marketing campaign HTML file."),
    html_generated                                 BOOLEAN OPTIONS(description="Has the HTML been generated and saved to GCS."),
    translation_verified                           BOOLEAN OPTIONS(description="Did the translation pass verification."),
)
CLUSTER BY customer_id;"""

In [None]:

client = bigquery.Client()
client.query_and_wait(customer_hyper_personalised_email_sql)

### <font color='#4285f4'>GIGA app presented by gTech team is used here to check the Trends and Insights for Car Insurance; and Gemini is used to extract keywords from those to be used for hypersonalised assets creation:</font> 

Decoding Car Insurance, Electric Vehicle, and Insurance Coverage Search Trends: Insights for Marketing & Strategy
This analysis examines the provided Google Ads keywords related to car insurance, electric vehicles, and general insurance coverage, sorted by descending YoY growth rate, to uncover consumer trends and potential marketing opportunities.

Overall Trends:
Explosive Growth in Specific Insurance Needs: Several keywords related to specific insurance needs, such as "liability insurance only" and "best car and home insurance", show extremely high growth. This suggests consumers are actively seeking very specific coverage options and are comparison shopping.
Strong Interest in Affordable Options: Keywords related to affordability, such as "most affordable car insurance" and "cheap medical insurance" are experiencing significant growth, indicating a strong consumer focus on cost-effectiveness.
Growing Demand for Medical Insurance: Several medical insurance related keywords are experiencing high growth, such as "medical insurance for low income" and "medical insurance benefits".
Continued Interest in Electric Vehicles: While not as explosive as some insurance-related terms, there's consistent growth in searches related to electric vehicles (EVs), including specific models and charging solutions.

### <font color='#4285f4'>Use Gemini to extract the Trends Keywords and use them further in hyper personalized assets</font>

In [None]:
from collections import Counter
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.tag import pos_tag
from string import punctuation

# Download required NLTK data (run once)
try:
    nltk.data.find('tokenizers/punkt')
    nltk.data.find('averaged_perceptron_tagger')
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('punkt')
    nltk.download('averaged_perceptron_tagger')
    nltk.download('stopwords')
    nltk.download('punkt_tab')
    nltk.download('averaged_perceptron_tagger_eng')

In [None]:
def extract_keywords(text, method='basic', num_keywords=10):
    """
    Extract keywords from text using different methods
    
    Parameters:
    - text: input text
    - method: 'basic', 'pos', or 'phrases'
    - num_keywords: number of keywords to return
    
    Returns:
    - list of keywords
    """
    
    # Convert to lowercase and tokenize
    tokens = word_tokenize(text.lower())
    
    # Remove stopwords and punctuation
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words and word not in punctuation]
    
    if method == 'basic':
        # Simple frequency-based approach
        word_freq = Counter(tokens)
        keywords = [word for word, _ in word_freq.most_common(num_keywords)]
        
    elif method == 'pos':
        # Parts of speech based approach (focus on nouns and adjectives)
        tagged = pos_tag(tokens)
        important_words = [word for word, tag in tagged if tag.startswith(('NN', 'JJ'))]
        word_freq = Counter(important_words)
        keywords = [word for word, _ in word_freq.most_common(num_keywords)]
        
    elif method == 'phrases':
        # Extract meaningful phrases (bigrams)
        bigrams = nltk.bigrams(tokens)
        bigram_freq = Counter(bigrams)
        keywords = [f"{w1} {w2}" for (w1, w2), _ in bigram_freq.most_common(num_keywords)]
    
    return keywords

In [None]:
# Your trends text
trends_text = """
Specific insurance needs, such as liability insurance only and best car and home insurance. This suggests consumers are actively seeking very specific coverage options and are comparison shopping.
Strong Interest in Affordable Options such as most affordable car insurance and cheap medical insurance, indicating a strong consumer focus on cost-effectiveness.
Growing Demand for Medical Insurance particularly medical insurance for low income and medical insurance benefits.
There's consistent growth in searches related to electric vehicles (EVs), including specific models and charging solutions.
"""

# Extract keywords (choose the method that works best for your needs)
keywords = extract_keywords(trends_text, method='phrases', num_keywords=15)

# Format for prompts
prompt_keywords = ", ".join(keywords)

# Use in your prompt
prompt = f"Use these keywords to create a marketing message where applicable: {prompt_keywords}"

In [None]:
display(prompt)

### <font color='#4285f4'>Natural Language search to find the customers for which we want to create hyper personalised emails</font>


In [None]:

USER_QUERY = "young driver with a Standard coverage high-value customer, but with low engagement with digital channels"
sql = f"""
WITH customer_ids AS (
SELECT
  distance,
  base.customer_id,
FROM VECTOR_SEARCH(
  -- base table or subquery
  (
    SELECT * FROM `{dataset_id}.customer_description_embeddings`
  ),

  -- embedding column to search in base table - must be of type ARRAY
  'customer_description_embedding',

  -- query table or subquery - this is where you generate the search embedding
  (
    SELECT ml_generate_embedding_result, content AS query
    FROM ML.GENERATE_EMBEDDING(
      MODEL `{dataset_id}.google-textembedding`,
        (
          -- Search term
          SELECT "{USER_QUERY}" AS content
        ),
        STRUCT(
          TRUE AS flatten_json_output,
          'SEMANTIC_SIMILARITY' as task_type,
          768 AS output_dimensionality
        )
    )
  ),
  top_k => 10,
  distance_type => 'COSINE'
))
SELECT c.customer_id,first_name,d.customer_description 
FROM `{dataset_id}.customers` c
JOIN `{dataset_id}.policies` p
ON  c.customer_id = p.customer_id
LEFT JOIN `{dataset_id}.customer_descriptions` d
ON c.customer_id = d.customer_id
WHERE c.customer_id IN (SELECT customer_id FROM customer_ids)
"""

client = bigquery.Client()
customers = client.query_and_wait(sql).to_dataframe()
customers.head()

In [None]:
customer_list = []
for row in customers.itertuples():
  customer_dict = {
    "customer_id" : row.customer_id,
    "customer_name" : row.first_name,
    "customer_description" : row.customer_description
  }
  print(f"Customer: {customer_dict}")
  customer_list.append(customer_dict)

# Just the Ids so we can query
customer_id_list = ([customer['customer_id'] for customer in customer_list])
customer_id_list_str = (', '.join(map(str, customer_id_list)))




### <font color='#4285f4'>Generate and run the LLM Marketing Prompt</font>

In [None]:
# In case you re-run (placeholder for delete)

##sql = f"""DELETE
           ## FROM `{dataset_id}.customer_hyper_personalised_email`
           ##WHERE customer_id IN ('{customer_id_list_str}')
           ##AND email_date = CURRENT_DATE();"""

##RunQuery(sql)*/

In [None]:
# For each customer, generate the marketing prompt and run against Gemini

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "customer_id" : 0,
#    "email_subject" : "text",
#    "marketing_text" : "text",
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "customer_id",
    "email_subject",
    "marketing_text",
    "explanation"
  ],
  "properties": {
    "customer_id": {
      "type": "string"
    },
    "email_subject": {
      "type": "string"
    },
    "marketing_text": {
      "type": "string"
    },
    "explanation": {
      "type": "string"
    }
  }
}

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  retry = 0
  success = False
  while not success:
    try:
      prompt = f"""You are an experienced insurance marketing analyst specializing in customer personalised reachout at Allianz Insurance, a Germany-based automobile insurance company. 
        Your task is to craft a highly personalized email for customer {item['customer_name']}
        Leverage the following customer data:
        Customer id: {item['customer_id']}
        Customer Description: {item['customer_description']}
        Context: Create a reach out email to this customer to engage with them effectively and suggest product upgrades based on their description.
        Goal: Create a persuasive email that encourages the customer to visit Allianz website and upgrade their insurance. Use the same customer id as input
        Format: Please provide a complete, ready-to-send email, tailored to the specific customer and next steps. Avoid using placeholders or templates.
        Additional keywords: {prompt_keywords}
      """

      print(prompt)
      llm_result = GeminiLLM(prompt,response_schema=response_schema)
      print(llm_result)
      json_result = json.loads(llm_result)
      result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")
      #json_text = json_result['marketing_text'].replace("'","\'")

      # Save to database
      try:

        sql=f"""INSERT INTO `{dataset_id}.customer_hyper_personalised_email`
                            (customer_id, email_date, llm_marketing_prompt, llm_marketing_prompt_response_json, llm_marketing_prompt_response_text)
                    VALUES ('{item['customer_id']}', CURRENT_DATE(), \"\"\"{prompt}\"\"\", JSON\"\"\"{result_escaped_quotes}\"\"\",\"\"\"{json_result['marketing_text']}\"\"\")"""
        #print(sql)
        RunQuery(sql)

        # Jump out of loop
        item['marketing_text'] = json_result['marketing_text']
        item['email_subject'] = json_result['email_subject']
        success = True

        print("---------------------------------------------------------------------------------------")
        print(f"LLM Marketing Prompt [Success] for Customer {item['customer_id']}")
        print("---------------------------------------------------------------------------------------")

      except Exception as e:
        retry += 1
        print("---------------------------------------------------------------------------------------")
        print(f"LLM Marketing Prompt [SQL Error] for Customer {item['customer_id']}: {sql}")
        print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"LLM Marketing Prompt [Error] for Customer {item['customer_id']}: {e}")
      print("---------------------------------------------------------------------------------------")

    if retry > 5:
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      print(f"LLM Marketing Prompt [Retry Limit Reached - Skipping] for Customer {item['customer_id']}")
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      break # Skip this customer

In [None]:
print("Number of customers:", len(customer_list))
print("Customer list contents:", customer_list)

### <font color='#4285f4'>Create an image prompt and enhance it by running it through Gemini</font>

In [None]:
# For each customer, generate an improved image prompt using Gemini

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "customer_id" : 0,
#    "image_prompt" : "text"
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "customer_id",
    "image_prompt",
    "explanation"
  ],
  "properties": {
    "customer_id": {
      "type": "string"
    },
    "image_prompt": {
      "type": "string"
    },
    "explanation": {
      "type": "string"
    }
  }
}

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  retry = 0
  success = False
  while not success:
    try:
      prompt = f"""You are a marketing expert at Allianz Insurance, a Germany-based automobile insurance company, your task is to craft a highly personalized image.
      You need to send out a hyper-personalized email to a customer {item['customer_name']}
      Generate a LLM Prompt to generate a marketing image based upon the customer's profile and the marketing message we are sending in the email.
      Do not show any damaged cars or damaged objects.
      The image can include their current car and any relevant objects as per personalized email text.
      We want the image to be specific to this customer and their car insurance.
      Think creatively and use the customer's interests to create a unique image.
      Make sure you state the image should be photo realistic.
      Avoid and copyrighted names or objects such as sporting teams names.
      Avoid mentioning any celebrity names or specific people.
      Do not include references to kids or children in the image prompt.
      Only audits can be rendered by the image process.
      This this through step by step.
      Double check for kids, children, or copyrighted sports teams names.

      Customer's profile:
        Customer id: {item['customer_id']}
        Customer Name: {item['customer_name']}
        Customer Description: {item['customer_description']}

      Marketing Message: {item['marketing_text']}
        """

      print(prompt)
      llm_result = GeminiLLM(prompt,response_schema=response_schema)
      print(llm_result)
      json_result = json.loads(llm_result)
      result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

      # Save to database
      try:
        sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                   SET llm_orginial_image_prompt = \"\"\"{prompt}\"\"\",
                       llm_orginial_image_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                       llm_orginial_image_prompt_response_text = \"\"\"{json_result['image_prompt']}\"\"\",
                       llm_improved_image_prompt = \"\"\"{json_result['image_prompt']}\"\"\"
                 WHERE customer_id = '{item['customer_id']}'"""

        #print(sql)
        RunQuery(sql)

        # Jump out of loop
        item['image_prompt'] = json_result['image_prompt']
        item['image_explanation'] = json_result['explanation']
        success = True

        print("---------------------------------------------------------------------------------------")
        print(f"LLM Image Prompt [Success] for Customer {item['customer_id']}")
        print("---------------------------------------------------------------------------------------")

      except Exception as e:
        retry += 1
        print("---------------------------------------------------------------------------------------")
        print(f"LLM Image Prompt [SQL Error] for Customer {item['customer_id']}: {sql}")
        print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"LLM Image Prompt [Error] for Customer {item['customer_id']}: {e}")
      print("---------------------------------------------------------------------------------------")

    if retry > 5:
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      print(f"LLM Image Prompt [Retry Limit Reached - Skipping] for Customer {item['customer_id']}")
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      break # Skip this customer

In [None]:
customer_ids = customer_id_list_str.split(', ')  # Split into individual IDs
customer_ids_formatted = "'" + "', '".join(customer_ids) + "'"  # Format each ID with quotes

In [None]:
# View Results
sql=f"""SELECT customer_id, llm_orginial_image_prompt, llm_orginial_image_prompt_response_json, llm_improved_image_prompt
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Call Imagen3 with the updated prompt</font>

In [None]:
# For each customer, generate the image using Imagen3
for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    image_prompt = item['image_prompt']
    print(f"Image Prompt: {image_prompt}")
    print(f"Image Prompt Explanation: {item['image_explanation']}")
    filename = ImageGen(item['image_prompt'])

    img = Image.open(filename)
    img.thumbnail([500,500]) # width, height
    IPython.display.display(img)

    # Save image to GCS
    dest_filename = f"email_campaign_{item['customer_id']}.png"
    copy_file_to_gcs(filename, storage_account, f"insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/{dest_filename}")
    item['gcs_filename'] = f"gs://{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/{dest_filename}"
    item['html_filename'] = f"https://storage.cloud.google.com/{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/{dest_filename}"

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                  SET image_gcs_filename = '{item['gcs_filename']}',
                      image_http_url = '{item['html_filename']}',
                      image_generated = TRUE
                WHERE customer_id = '{item['customer_id']}'"""

      #print(sql)
      RunQuery(sql)
      item['image_filename'] = filename

      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Imagen3 [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# To view the bucket
print(f"https://console.cloud.google.com/storage/browser/{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}")

In [None]:
# View Results
sql=f"""SELECT customer_id, image_gcs_filename, image_http_url, image_generated
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Verify the Generated Image with Gemini Vision</font>

In [None]:
# Verify the generate image is correct

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "image_verified" : true
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "image_verified",
    "explanation"
  ],
  "properties": {
    "image_verified": {
      "type": "boolean"
    },
    "explanation": {
      "type": "string"
    }
  }
}

json_schema = '{ "image_verified" : true, "explanation" : "text" }'

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    image_prompt = item['image_prompt']
    print(f"Image Prompt: {image_prompt}")
    print(f"Image Prompt Explanation: {item['image_explanation']}")
    filename = item['image_filename']

    prompt = f"""I need you to verify that the below image meets the following criteria:
    <ImagePrompt>
    {item['image_prompt']}
    </ImagePrompt>
    <ImageExplanation>
    {item['image_explanation']}
    </ImageExplanation>
    """

    print(prompt)
    imageBase64 = convert_png_to_base64(filename)
    llm_result = GeminiLLM_VerifyImage(prompt, imageBase64, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")


    # Update table in BigQuery
    try:
      sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                  SET llm_verify_image_prompt = \"\"\"{prompt}\"\"\",
                      llm_verify_image_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_verify_image_text = \"\"\"{json_result['explanation']}\"\"\",
                      image_verified = {json_result['image_verified']}
                WHERE customer_id = '{item['customer_id']}'"""

      #print(sql)
      RunQuery(sql)

      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 Verification [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 Verification [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Imagen3 Verification [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_verify_image_prompt, llm_verify_image_response_json, llm_verify_image_text, image_verified
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Translate the Marketing Message to another language</font>

In [None]:
# Translate the marketing text into another language (we will randomly pick on)

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "translated_text" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "translated_text"
  ],
  "properties": {
    "translated_text": {
      "type": "string"
    }
  }
}


for item in customer_list:
  # Pick an random language from the list
  random_language = 12 # we will do German or you can do a Random language: random.randint(0,len(gemini_languages)-1)
  print(f"Random Language: {gemini_languages[random_language]}")
  print(f"Customer id: {item['customer_id']}")
  item['translation_language'] = gemini_languages[random_language]
  try:

    prompt = f"""You are an expert translator for the following language {gemini_languages[random_language]}.
    Translate the following text from English to {gemini_languages[random_language]}:
    <Text>
    {item['marketing_text']}
    </Text>
    """

    print(prompt)
    llm_result = GeminiLLM(prompt, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                  SET llm_translation_language_prompt = \"\"\"{prompt}\"\"\",
                      llm_translation_language_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_translation_language_prompt_response_text = \"\"\"{json_result['translated_text']}\"\"\"
                WHERE customer_id = '{item['customer_id']}'"""

      #print(sql)
      RunQuery(sql)
      item['translated_text'] = json_result['translated_text']

      print("---------------------------------------------------------------------------------------")
      print(f"Translation [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Translation [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Translation [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_translation_language_prompt, llm_translation_language_prompt_response_json, llm_translation_language_prompt_response_text
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Verify the Translation</font>

In [None]:
# Verify the translation is correct

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "translation_verified" : true
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "translation_verified",
    "explanation"
  ],
  "properties": {
    "translation_verified": {
      "type": "boolean"
    },
    "explanation": {
      "type": "string"
    }
  }
}


for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    prompt = f"""I need you to verify that the below text is in the langugage of "{item['translation_language']}".
    It was originially in English, so make sure it is not still English.
    <Text>
    {item['translated_text']}
    </Text>
    """

    print(prompt)
    llm_result = GeminiLLM(prompt, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                  SET llm_validate_translation_prompt = \"\"\"{prompt}\"\"\",
                      llm_validate_translation_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_validate_translation_prompt_response_text = \"\"\"{json_result['explanation']}\"\"\",
                      translation_verified = {json_result['translation_verified']}
                WHERE customer_id = '{item['customer_id']}'"""

      #print(sql)
      RunQuery(sql)

      print("---------------------------------------------------------------------------------------")
      print(f"Translation Verification [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Translation Verification [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Translation Verification [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_validate_translation_prompt, llm_validate_translation_prompt_response_json, llm_validate_translation_prompt_response_text, translation_verified
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Generate the HTML and Save</font>

In [None]:
html_template = """<!DOCTYPE html>
<html>
<head>
  <title>Insurance upgrade Campaign</title>
  <style>
    body {
      font-family: 'Helvetica Neue', sans-serif;
    }
    .email-campaign {
      background-color: #EDF2F9;
      padding: 20px;
      margin-bottom: 20px;
      border-bottom: 2px solid #ddd;
    }
    h3 {
      font-size: 16px;
      margin-bottom: 10px;
      color: #333;
    }
    p {
      font-size: 14px;
      line-height: 1.5;
      color: #555;
    }
  </style>
</head>
<body>
  <div class="email-campaign">
    <h3>Email Campaign (English)</h3>
    <p style="font-weight: bold;">Subject: ##email_subject##</p>
    <p>##marketing_text##</p>
  </div>
  <div>
    <img src="##html_filename##" width="500" height="500" alt="Item Image">
  </div>

  <hr/>

  <div class="email-campaign">
    <h3>Email Campaign (##translation_language##)</h3>
    <p>##translated_text##</p>
  </div>
  <div>
    <img src="##html_filename##" width="500" height="500" alt="Item Image">
  </div>

</body>
</html>
"""

In [None]:
# Create HTML using the Template

for item in customer_list:
  print(item)
  if 'html_filename' not in item:
    # Error generating image
    print("Error: 'html_filename' not in item")
    continue

  if 'translated_text' not in item:
    # Error generating translation
    print("Error: 'translated_text' not in item")
    continue

  # Replace the placeholders with the actual values
  html = html_template \
    .replace("##email_subject##", item['email_subject']) \
    .replace("##marketing_text##", item['marketing_text']) \
    .replace("##translation_language##", item['translation_language']) \
    .replace("##translated_text##", item['translated_text']) \
    .replace("##html_filename##", item['html_filename'])

  filename = f"email_campaign_{item['customer_id']}.html"

  # Save the HTML to a file
  with open(filename, "w") as f:
    f.write(html)

  copy_file_to_gcs(filename, storage_account,f"insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/" + filename)

  # Update table in BigQuery
  try:
    html_gcs_filename = f"gs://{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/{filename}"
    html_http_url = f"https://storage.cloud.google.com/{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}/{filename}"

    sql=f"""UPDATE `{dataset_id}.customer_hyper_personalised_email`
                SET html_gcs_filename = '{html_gcs_filename}',
                    html_http_url = '{html_http_url}',
                    html_generated = TRUE
              WHERE customer_id = '{item['customer_id']}'"""

    #print(sql)
    RunQuery(sql)

    print("---------------------------------------------------------------------------------------")
    print(f"HTML Generation [Success] for Customer {item['customer_id']}")
    print("---------------------------------------------------------------------------------------")

  except Exception as e:
    retry += 1
    print("---------------------------------------------------------------------------------------")
    print(f"HTML Generation [SQL Error] for Customer {item['customer_id']}: {sql}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# To view the bucket
print(f"https://console.cloud.google.com/storage/browser/{storage_account}/insurance-data/Campaign-Assets-Hyper-Personalised-Email/email-{formatted_date}")

In [None]:
# View Results
sql=f"""SELECT customer_id, html_gcs_filename, html_http_url, html_generated
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE();"""

print(sql)
df_process = RunQuery(sql)
df_process

In [None]:
filename = f"email_campaign_{customer_list[7]['customer_id']}.html"
IPython.display.HTML(filename=filename)

In [None]:
filename = f"email_campaign_{customer_list[9]['customer_id']}.html"
IPython.display.HTML(filename=filename)

### <font color='#4285f4'>View all results</font>

In [None]:
# View All Results
sql=f"""SELECT *
          FROM `{dataset_id}.customer_hyper_personalised_email`
         WHERE customer_id IN ({customer_ids_formatted})
           AND email_date = CURRENT_DATE();"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Clean Up</font>

In [None]:
# Placeholder

### <font color='#4285f4'>Reference Links</font>


- [Imagen3](https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation)