# Data Generation

In [0]:
# !pip install faker

In [0]:
import pandas as pd
import pyspark.sql.functions as F
import numpy as np
import random
import uuid

from datetime import datetime
from faker import Faker
from pyspark.sql.types import StringType

In [0]:
# Set catalog and schema
catalog = "jack_sandom"
schema = "ai_audience_segments"

## Step 1: Generate structured data for clustering

In [0]:
# Set seed
np.random.seed(42)

We need to use conditional probabilities in our data gen code in order to "force" the clusters for later.

In [0]:
# Defining our clusters and sizes
cluster_sizes = {
  "Young Urban Professional": 250,
  "Suburban Family-Oriented": 250,
  "Retired Rural Dweller": 150,
  "College Student": 150,
  "High-Income Empty Nester": 200,
}

In [0]:
# Function to generate correlated data per cluster
def generate_cluster_data(cluster_name, size):
    if cluster_name == "Young Urban Professional":
        ages = np.random.randint(25, 35, size)
        incomes = np.random.normal(50000, 10000, size).clip(30000, 150000)
        locations = ["Urban"] * size
        education_levels = np.random.choice(["Bachelor's", "Post Graduate"], size, p=[0.6, 0.4])
        relationship_statuses = np.random.choice(["Single", "Cohabiting"], size, p=[0.6, 0.4])
        number_dependants = np.random.choice([0, 1], size, p=[0.8, 0.2])
        occupations = np.random.choice(["Professional", "Executive"], size, p=[0.7, 0.3])

    elif cluster_name == "Suburban Family-Oriented":
        ages = np.random.randint(35, 50, size)
        incomes = np.random.normal(50000, 10000, size).clip(40000, 150000)
        locations = ["Suburban"] * size
        education_levels = np.random.choice(["Some College", "Bachelor's", "Post Graduate"], size, p=[0.3, 0.5, 0.2])
        relationship_statuses = ["Cohabiting"] * size
        number_dependants = np.random.choice([1, 2, 3, 4], size, p=[0.3, 0.4, 0.2, 0.1])
        occupations = np.random.choice(["Professional", "Skilled Trades"], size, p=[0.6, 0.4])

    elif cluster_name == "Retired Rural Dweller":
        ages = np.random.randint(60, 81, size)
        incomes = np.random.normal(40000, 5000, size).clip(20000, 60000)
        locations = ["Rural"] * size
        education_levels = np.random.choice(["High School", "Some College", "Bachelor's", "Post Graduate"], size, p=[0.5, 0.3, 0.1, 0.1])
        relationship_statuses = np.random.choice(["Cohabiting", "Widowed"], size, p=[0.7, 0.3])
        number_dependants = np.random.choice([0, 1], size, p=[0.8, 0.2])
        occupations = ["Retired"] * size

    elif cluster_name == "College Student":
        ages = np.random.randint(18, 22, size)
        incomes = np.random.normal(20000, 3000, size).clip(0, 40000)
        locations = ["Urban"] * size
        education_levels = ["Some College"] * size
        relationship_statuses = ["Single"] * size
        number_dependants = [0] * size
        occupations = ["Student"] * size

    elif cluster_name == "High-Income Empty Nester":
        ages = np.random.randint(50, 65, size)
        incomes = np.random.normal(120000, 20000, size).clip(80000, 200000)
        locations = ["Suburban"] * size
        education_levels = np.random.choice(["Bachelor's", "Post Graduate"], size, p=[0.5, 0.5])
        relationship_statuses = ["Cohabiting"] * size
        number_dependants = [0] * size
        occupations = np.random.choice(["Executive", "Professional"], size, p=[0.5, 0.5])

    return pd.DataFrame({
        "age": ages,
        "income": incomes.round(-3),
        "location": locations,
        "education": education_levels,
        "relationship_status": relationship_statuses,
        "number_dependants": number_dependants,
        "occupation": occupations,
        "segment": cluster_name
    })

In [0]:
# Generate data for all clusters
cluster_dfs = [generate_cluster_data(cluster, size) for cluster, size in cluster_sizes.items()]
demographic_df = pd.concat(cluster_dfs, ignore_index=True)

# Shuffle data
demographic_df = demographic_df.sample(frac=1).reset_index(drop=True)

# Add UUID
demographic_df.insert(0, 'uuid', [str(uuid.uuid4()) for _ in range(len(demographic_df))])

In [0]:
demographic_df.head()

## Step 2: Generate social media posts

In [0]:
# Get random sample from demographic data
sampled_df = demographic_df.sample(n=100).reset_index(drop=True)

In [0]:
# Define segment-specific products and possible emotions
segment_products = {
    "Young Urban Professional": ["smartphone", "laptop", "smartwatch", "wireless earbuds", "fitness tracker"],
    "Suburban Family-Oriented": ["family SUV", "grill", "home security system", "washing machine", "family board game"],
    "Retired Rural Dweller": ["gardening tools", "golf clubs", "heating blanket", "armchair", "bird feeder"],
    "College Student": ["backpack", "coffee maker", "gaming console", "textbooks", "bicycle"],
    "High-Income Empty Nester": ["luxury watch", "high-end camera", "luxury car", "wine fridge", "holiday package"]
}

emotions = ["excited", "angry", "satisfied", "frustrated", "disappointed", "scared", "relaxed", "confused", "amazed", "curious"]

# Generate 100 unique combinations
combinations = []
for _ in range(100):
    segment = random.choice(list(segment_products.keys()))
    author_id = demographic_df[demographic_df["segment"] == segment]["uuid"].sample(1).values[0]
    product = random.choice(segment_products[segment])
    emotion = random.choice(emotions)
    combinations.append({
        "author_id": author_id,
        "segment": segment,
        "product": product,
        "emotion": emotion
    })

# Convert to DataFrame
combinations_df = pd.DataFrame(combinations)
combinations_sdf = spark.createDataFrame(combinations_df)

In [0]:
display(combinations_sdf.groupBy("segment").count())

In [0]:
# Creat temp view for AI_QUERY
combinations_sdf.createOrReplaceTempView("sampled_audience")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW sampled_audience_posts AS
SELECT
  author_id,
  AI_QUERY(
    "databricks-meta-llama-3-3-70b-instruct", 
    "Generate a realistic social media post about a purchase of a " || product||  "from the perspective of a " || segment || "who is " || emotion || "about the product. Make sure you sound like a " || segment || ". Keep it concise, authentic, and similar to what someone would post on X or Instagram. Include no more than two hashtags and emojis. Don't explicitly mention the segment or that you are an AI assistant. Remove quotation marks.",
    modelParameters => named_struct('max_tokens', 100)
  ) AS post
FROM sampled_audience

In [0]:
posts_df = spark.sql("select * from sampled_audience_posts").toPandas()

In [0]:
display(posts_df)

### Save demographic table and write social media posts to volume JSON

In [0]:
fake = Faker()

# Generate post id and creation date
posts_df.insert(0, 'id', [str(uuid.uuid4()) for _ in range(len(posts_df))])
posts_df['created_at'] = [
  fake.date_time_between(datetime(2024, 1, 1), datetime(2024, 12, 31)).strftime('%Y-%m-%d %H:%M:%S') for _ in range(len(posts_df))]

In [0]:
uc_volume_path = "/Volumes/jack_sandom/ai_audience_segments/social_media_feed/posts.json"

posts_df.to_json(uc_volume_path, orient='records')

In [0]:
# Write demographic data to UC table dropping segment
demographic_sdf = spark.createDataFrame(demographic_df)
demographic_sdf = demographic_sdf.drop("segment")
demographic_sdf.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.audience_demographic")