In [None]:
# Imports necessary libraries for Task 2 and re-initializes the client and variables.
# pandas_gbq is imported to use the updated to_gbq function.
from google.cloud import bigquery
import pandas as pd
import pandas_gbq
from IPython.display import display

client = bigquery.Client(location="us-central1")

# This assumes PROJECT_ID is already set from Task 1. If not, re-enter it.
PROJECT_ID = '' # <-- YOUR GCP PROJECT ID
DATASET_ID = 'cymbal'

TABLE_ID_CUSTOMERS = f"{PROJECT_ID}.{DATASET_ID}.customers"
table_id_multimodal_reviews = f"{PROJECT_ID}.{DATASET_ID}.multimodal_customer_reviews"
GEMINI_MODEL_NAME = f'{PROJECT_ID}.{DATASET_ID}.gemini_flash_model'
table_id_segment_level_analysis = f"{PROJECT_ID}.{DATASET_ID}.segment_level_gemini_analysis"


print(f"Project ID: {PROJECT_ID}")

def run_bq_query(sql: str, client: bigquery.Client):
    try:
        query_job = client.query(sql)
        print(f"Job {query_job.job_id} in state {query_job.state}")
        if query_job.statement_type == 'SELECT':
            df = query_job.to_dataframe()
            print(f"Query complete. Fetched {len(df)} rows.")
            return df
        else:
            query_job.result()
            print(f"Query for statement type {query_job.statement_type} complete.")
            return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [None]:
# Verifies the customers table by displaying a sample of its demographic data.
print(f"\nVerifying Table: {TABLE_ID_CUSTOMERS} (First 5 rows)")
df_customers_verify = run_bq_query(f"SELECT customer_id, first_name, age, gender, loyalty_member FROM `{TABLE_ID_CUSTOMERS}` LIMIT 5", client)
if df_customers_verify is not None:
    display(df_customers_verify)

In [None]:
# This query identifies the unique combinations of customer attributes to be used as persona profiles.
# Note: We parse the sentiment from the JSON string for this logic.
sql_get_profiles = f"""
WITH EnrichedData AS (
    SELECT
        c.customer_id,
        JSON_EXTRACT_SCALAR(mcr.sentiment_json_string, '$.sentiment') as text_sentiment,
        CASE
            WHEN c.age < 40 THEN 'Younger_Adult'
            ELSE 'Older_Adult'
        END AS age_group,
        UPPER(c.gender) as gender,
        IF(c.loyalty_member, 'LOYAL', 'NON_LOYAL') as loyalty_status
    FROM `{table_id_multimodal_reviews}` AS mcr
    JOIN `{TABLE_ID_CUSTOMERS}` AS c ON mcr.customer_id = c.customer_id
    WHERE c.age IS NOT NULL AND c.gender IS NOT NULL AND c.loyalty_member IS NOT NULL
)
SELECT DISTINCT
    CONCAT(age_group, '_', gender, '_', loyalty_status) as persona_age_group_profile
FROM EnrichedData
ORDER BY 1;
"""
print(f"Identifying unique segment profiles for Gemini analysis...")
df_profiles = run_bq_query(sql_get_profiles, client)
if df_profiles is not None:
    display(df_profiles)

In [None]:
# The Gemini prompt is designed to generate a complete persona analysis in a single call.
# The prompt string is also properly escaped for SQL to prevent syntax errors.
gemini_prompt_template = """
Based on the customer segment profile "{p}", generate a single, valid JSON object.
The JSON MUST contain these keys:
"persona_description" (a concise, one-sentence summary of this persona),
"summary" (a more detailed summary of their likely preferences),
"motivations" (what drives their purchasing decisions),
"needs" (what they look for in a product or service),
"marketing_pitch" (a short marketing pitch targeting them).
Ensure the entire output is ONLY this single JSON object.
"""

all_results = []
if df_profiles is not None:
    print(f"Starting Gemini analysis for {len(df_profiles)} profiles...")
    for _, row in df_profiles.iterrows():
        profile = row['persona_age_group_profile']

        # Format the prompt with the profile name
        raw_prompt = gemini_prompt_template.format(p=profile)

        # Make the prompt string safe for embedding in a SQL query by escaping quotes and newlines.
        prompt_for_sql = raw_prompt.replace("'", "''").replace("\n", " ")

        sql = f"""SELECT '{profile}' as profile, ml_generate_text_llm_result AS analysis FROM ML.GENERATE_TEXT(MODEL `{GEMINI_MODEL_NAME}`,
            (SELECT '{prompt_for_sql}' AS prompt), STRUCT(0.5 AS temperature, 1024 as max_output_tokens, TRUE AS flatten_json_output))"""

        print(f"  Analyzing profile: '{profile}'...")
        result_df = run_bq_query(sql, client)
        if result_df is not None:
            all_results.append(result_df)

    if all_results:
        df_all_analysis = pd.concat(all_results, ignore_index=True)
        print(f"\nSaving {len(df_all_analysis)} analyses to '{table_id_segment_level_analysis}'...")
        # Use the recommended pandas_gbq library to write to BigQuery
        pandas_gbq.to_gbq(df_all_analysis, table_id_segment_level_analysis, project_id=PROJECT_ID, if_exists='replace')
        print("Successfully saved results.")

In [None]:
# Verifies the raw Gemini analysis table by displaying its content.
# This allows us to inspect the 'analysis' column before parsing it.
print(f"\n--- Verifying Raw Gemini Output in: {table_id_segment_level_analysis} ---")
df_raw_analysis = run_bq_query(f"SELECT * FROM `{table_id_segment_level_analysis}` LIMIT 5", client)
if df_raw_analysis is not None:
    with pd.option_context('display.max_colwidth', None):
        display(df_raw_analysis)

In [None]:
# This query combines all data sources into a single, comprehensive table for final analysis.
table_id_final_customer_insights = f"{PROJECT_ID}.{DATASET_ID}.final_customer_insights"
sql_create_final_table = f"""
CREATE OR REPLACE TABLE `{table_id_final_customer_insights}` AS
WITH EnrichedData AS (
    SELECT mcr.*, c.first_name, c.last_name, c.age, c.gender, c.loyalty_member,
        CONCAT(
            CASE WHEN c.age < 40 THEN 'Younger_Adult' ELSE 'Older_Adult' END, '_',
            UPPER(c.gender), IF(c.loyalty_member, '_LOYAL', '_NON_LOYAL')
        ) AS persona_age_group_profile
    FROM `{table_id_multimodal_reviews}` AS mcr
    JOIN `{TABLE_ID_CUSTOMERS}` AS c ON mcr.customer_id = c.customer_id
)
SELECT enriched.*, persona.analysis AS gemini_persona_analysis
FROM EnrichedData enriched
LEFT JOIN `{table_id_segment_level_analysis}` persona ON enriched.persona_age_group_profile = persona.profile;
"""
print(f"Creating the final customer insights table '{table_id_final_customer_insights}'...")
run_bq_query(sql_create_final_table, client)


# This query now creates the persona definitions table by robustly parsing the description
# from the JSON generated by Gemini, cleaning it first to handle any markdown.
final_persona_table_id = f"{PROJECT_ID}.{DATASET_ID}.customer_persona_definitions"
sql_create_personas = f"""
CREATE OR REPLACE TABLE `{final_persona_table_id}` AS
WITH cleaned_analysis AS (
  SELECT
    profile,
    -- Clean the JSON string by removing markdown backticks and whitespace
    TRIM(REGEXP_REPLACE(analysis, r'(?i)(^```json\\s*|\\s*```$)', '')) as cleaned_json
  FROM
    `{table_id_segment_level_analysis}`
)
SELECT
    profile AS persona_age_group_profile,
    JSON_EXTRACT_SCALAR(cleaned_json, '$.persona_description') AS persona_description
FROM
    cleaned_analysis
WHERE
    JSON_EXTRACT_SCALAR(cleaned_json, '$.persona_description') IS NOT NULL;
"""
print(f"\nCreating final persona definitions table from Gemini output: {final_persona_table_id}...")
run_bq_query(sql_create_personas, client)

print(f"\n--- Final Customer Persona Definitions (Generated by Gemini) ---")
df_personas = run_bq_query(f"SELECT * FROM `{final_persona_table_id}` ORDER BY 1", client)
if df_personas is not None:
    with pd.option_context('display.max_colwidth', None):
        display(df_personas)