# Image Variant Generation with S3 Storage

This notebook:
1. Queries a database for deal IDs
2. Generates variant images using OpenAI
3. Stores images in S3
4. Implements async processing for efficiency

In [1]:
import pandas as pd
import os
import psycopg2
from dotenv import load_dotenv
import boto3
import time
import base64
import asyncio
import aiohttp
import tempfile
import sys
from botocore.exceptions import NoCredentialsError
from concurrent.futures import ThreadPoolExecutor
import subprocess
import json
from io import BytesIO

# Load environment variables
load_dotenv()

# Configure AWS credentials
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)

## Query Database for Deal IDs

Execute SQL to get deal information including:
- deal_voucher_id
- original_image_id
- variant_image_id
- batch_name
- enter_test_ts
- exit_test_ts
- open_ai_prompt

In [2]:
def get_deals_for_processing():
    # Establish connection to Redshift
    conn = psycopg2.connect(
        host=os.environ.get("REDSHIFT_HOST", "bi-redshift.intwowcher.co.uk"),
        port=os.environ.get("REDSHIFT_PORT", "5439"),
        dbname=os.environ.get("REDSHIFT_DBNAME", "wowdwhprod"),
        user=os.environ.get("REDSHIFT_USER", "jenkins"),
        password=os.environ.get("REDSHIFT_PASSWORD", "9SDy1ffdfTV7")
    )
    
    # Example query - modify as needed
    query = """
WITH revenue_per_deal AS (
    SELECT deal_id, SUM(net) AS revenue_last_14_days
    FROM real.transactions
    WHERE order_date > sysdate - 14
    GROUP BY deal_id
)
SELECT
    CAST(dv.id AS INTEGER) AS id,
    dv.email_subject,
    dvc.name AS category_name,
    dvsc.name AS sub_category_name,
    CAST(COALESCE(rpd.revenue_last_14_days, 0) AS INTEGER) AS revenue_last_14_days,
    CAST(rank() OVER (ORDER BY COALESCE(rpd.revenue_last_14_days, 0) DESC) AS INTEGER) AS revenue_rank,
    CAST(dvi.id AS INTEGER) AS image_id_pos_0,
    'https://static.wowcher.co.uk/images/deal/' || dvi.deal_voucher_id || '/' || dvi.id || '.' || dvi.extension AS image_url_pos_0,
    dvi.extension
FROM real.deal_voucher dv
JOIN real.product p ON p.id = dv.id AND p.status_id = 1
LEFT JOIN revenue_per_deal rpd ON rpd.deal_id = dv.id
LEFT JOIN real.deal_voucher_site dvs ON dvs.deal_voucher_id = dv.id
LEFT JOIN real.deal_voucher_image dvi ON dvi.deal_voucher_id = dv.id AND dvi.position = 0
LEFT JOIN real.deal_voucher_category dvc ON dvc.id = dv.category_id
LEFT JOIN real.deal_voucher_sub_category dvsc ON dvsc.id = dv.sub_category_id
WHERE trunc(dv.closing_date) >= trunc(sysdate)
    AND NOT EXISTS (
        SELECT 1
        FROM temp.opt_image_variants oiv
        WHERE oiv.deal_voucher_id = dv.id
          AND (batch_name ILIKE '%manual%' AND status = 1)
    )
    AND dvc.canonical_path_type = 'NATIONAL'
GROUP BY dv.id, dv.email_subject, dvc.name, dvsc.name, dvi.id, dvi.deal_voucher_id, dvi.extension,rpd.revenue_last_14_days
ORDER BY COALESCE(rpd.revenue_last_14_days, 0) DESC
LIMIT 500;
    """
    
    df = pd.read_sql(query, conn)
    conn.close()
    
    return df
# Get deals to process
deals_df = get_deals_for_processing()
deals_df.head()

  df = pd.read_sql(query, conn)


Unnamed: 0,id,email_subject,category_name,sub_category_name,revenue_last_14_days,revenue_rank,image_id_pos_0,image_url_pos_0,extension
0,33032041,4-Seater Rattan Garden Furniture Set,Garden,Garden Furniture,15989,1,1386187,https://static.wowcher.co.uk/images/deal/33032...,jpg
1,32097419,Nine-Seater Garden Rattan Furniture Set,Garden,Garden Furniture,12384,2,1428878,https://static.wowcher.co.uk/images/deal/32097...,jpg
2,39821204,Emma Hybrid Premium Mattress,Home,Beds & Mattresses,11668,3,1581451,https://static.wowcher.co.uk/images/deal/39821...,jpg
3,36958413,Rattan Furniture Set & Fire Pit Table,Garden,Garden Furniture,6824,4,1447574,https://static.wowcher.co.uk/images/deal/36958...,jpg
4,37259236,Divan Bed & Memory Sprung Mattress,Home,Beds & Mattresses,5314,5,1456986,https://static.wowcher.co.uk/images/deal/37259...,jpg


## S3 Upload Functions

Functions to upload generated images to S3

In [3]:
def upload_to_s3(file_content, bucket_name, s3_key):
    """
    Upload a file to S3
    
    Parameters:
    - file_content: Binary content of the file
    - bucket_name: S3 bucket name
    - s3_key: Path in S3 where file will be stored
    
    Returns:
    - URL of the uploaded file
    """
    try:
        # Determine content type based on file extension
        extension = os.path.splitext(s3_key)[1].lower()
        content_type = 'image/jpeg' if extension in ['.jpg', '.jpeg'] else \
                      'image/png' if extension == '.png' else \
                      'image/webp' if extension == '.webp' else \
                      'application/octet-stream'
                      
        s3_client.put_object(
            Body=file_content,
            Bucket=bucket_name,
            Key=s3_key,
            ContentType=content_type
        )
        return f"https://static.wowcher.co.uk/{s3_key}"
    except NoCredentialsError:
        print("Credentials not available")
        return None

## Image Generation

Function to call the generate_image.py script and process the results

In [9]:
# Cell to replace external script dependency with integrated functionality

import pandas as pd
import os
import psycopg2
from dotenv import load_dotenv
import requests
import base64
from openai import OpenAI
from io import BytesIO

# Load environment variables if not already done
if 'client' not in locals():
    load_dotenv()
    client = OpenAI(api_key=os.getenv('OPEN_AI_API_KEY'))
    print(f"OpenAI client initialized with API key: {os.getenv('OPEN_AI_API_KEY')[:5]}...")

def get_deal_data_for_image(deal_id):
    """Get deal data needed for image generation"""
    # Establish connection to Redshift
    conn = psycopg2.connect(
        host=os.environ.get("REDSHIFT_HOST", "bi-redshift.intwowcher.co.uk"),
        port=os.environ.get("REDSHIFT_PORT", "5439"),
        dbname=os.environ.get("REDSHIFT_DBNAME", "wowdwhprod"),
        user=os.environ.get("REDSHIFT_USER", "jenkins"),
        password=os.environ.get("REDSHIFT_PASSWORD", "9SDy1ffdfTV7")
    )

    # Get email subject
    email_subject_query = """
    SELECT email_subject 
    FROM wowdwhprod.real.deal_voucher
    WHERE id = %s
    """
    with conn.cursor() as cur:
        cur.execute(email_subject_query, (deal_id,))
        email_subject_result = cur.fetchone()
        email_subject = email_subject_result[0] if email_subject_result else "Deal"

    # Get image URLs and extract extension information
    image_query = """
    SELECT 
        'https://static.wowcher.co.uk/images/deal/' || deal_voucher_id || '/' || id || '.' || extension AS image_url,
        extension
    FROM wowdwhprod.real.deal_voucher_image
    WHERE deal_voucher_id = %s
    ORDER BY position
    LIMIT 10
    """
    with conn.cursor() as cur:
        cur.execute(image_query, (deal_id,))
        image_results = cur.fetchall()
        image_urls = [row[0] for row in image_results]
        extensions = [row[1] for row in image_results]
        original_extension = extensions[0] if extensions else "png"

    # Get highlights
    highlights_query = """
    SELECT highlight 
    FROM wowdwhprod.real.deal_voucher_highlight 
    WHERE deal_voucher_id = %s
    limit 3
    """
    with conn.cursor() as cur:
        cur.execute(highlights_query, (deal_id,))
        highlights_results = cur.fetchall()
        highlights = [row[0] for row in highlights_results]

    conn.close()

    # Build the prompt
    formatted_highlights = "\n".join([f"• {h}" for h in highlights]) if highlights else ""
    prompt = f"""
Create ONE high-resolution hero image advertising **{email_subject}**.

Final image must contain **zero spelling mistakes**.  

1. **Source images** – You have multiple angles.  
   • Accurately represent the product; do **not** invent new colours or features.  
   • If variants exist, PICK ONE colour and keep it consistent.

2. **Scene & background**  
   • Place the product in a realistic, aspirational environment that makes sense for its use.  
   • Adjust lighting and depth of field so the product is the clear focal point.  
   • Background must not overpower or obscure the product.

3. **Infographic & text elements**  
    • Do **not** repeat the headline anywhere else in the artwork.  
    • Any additional text must be limited to the 2-4 call-outs listed below.
    
   • Overlay 2-4 concise call-outs drawn from these highlights:  
     {formatted_highlights}  
   • Position all call-outs **outside** the bottom-right 20% of the frame.

4. **Design constraints**  
   • Keep bottom-right area completely free of any graphics or text.  
   • Maintain 4 px padding around all text boxes.  
   • No brand logos unless provided in the source images.
    """
    return {
        'prompt': prompt,
        'image_urls': image_urls,
        'original_extension': original_extension
    }

def download_image_to_file(url, filename):
    """Download an image from URL and save to file"""
    response = requests.get(url)
    if response.status_code == 200:
        with open(filename, 'wb') as f:
            f.write(response.content)
        return filename
    else:
        raise Exception(f"Failed to download image from {url}")

def generate_image_integrated(deal_id, original_id, temp_dir):
    """
    Generate image using OpenAI's API (integrated version of generate_image.py)
    """
    try:
        print(f"Processing deal {deal_id} with original image {original_id}")
        # Get data for the deal
        deal_data = get_deal_data_for_image(deal_id)
        prompt = deal_data['prompt']
        image_urls = deal_data['image_urls']
        original_extension = deal_data['original_extension']
        
        # Create output filename
        output_filename = os.path.join(temp_dir, f"variant_{deal_id}_{original_id}.{original_extension}")
        
        if not image_urls:
            raise Exception("No images found for this deal")
            
        print(f"Found {len(image_urls)} images. Downloading up to 16 images...")
        
        # Download images
        image_files = []
        temp_filenames = []
        for idx, url in enumerate(image_urls[:16]):
            temp_filename = os.path.join(temp_dir, f"temp_image_{deal_id}_{idx}.png")
            download_image_to_file(url, temp_filename)
            temp_filenames.append(temp_filename)
            image_files.append(open(temp_filename, "rb"))
            
        print("Calling OpenAI API to edit images...")
        
        # Call OpenAI API
        result = client.images.edit(
            model="gpt-image-1",
            image=image_files,
            prompt=prompt,
            size="1536x1024",
            quality="high",
            background="auto",
            n=1
        )
        
        print("Received response from OpenAI API. Decoding and saving image...")
        
        # Process and save the response
        image_base64 = result.data[0].b64_json
        image_bytes = base64.b64decode(image_base64)
        with open(output_filename, "wb") as f:
            f.write(image_bytes)
            
        # Close file handles
        for f in image_files:
            f.close()
            
        # Delete temporary files
        for filename in temp_filenames:
            if os.path.exists(filename):
                try:
                    os.remove(filename)
                except:
                    pass
        
        print(f"Saved generated image to {output_filename}")
        
        # Process token usage details
        print("Token usage details:")
        token_info = {}
        
        total_tokens = result.usage.total_tokens
        input_tokens = result.usage.input_tokens
        output_tokens = result.usage.output_tokens
        input_text_tokens = result.usage.input_tokens_details.text_tokens
        input_image_tokens = result.usage.input_tokens_details.image_tokens
        
        token_info["Total tokens"] = str(total_tokens)
        token_info["Input tokens"] = str(input_tokens)
        token_info["Output tokens"] = str(output_tokens)
        token_info["Input text tokens"] = str(input_text_tokens)
        token_info["Input image tokens"] = str(input_image_tokens)
        
        # Calculate cost
        cost = (input_text_tokens * 5 + input_image_tokens * 10 + output_tokens * 40) / 1000000
        token_info["Cost"] = f"${cost:.6f}"
        
        print(f"Cost: ${cost:.6f}")
        
        return output_filename, original_extension, token_info
        
    except Exception as e:
        print(f"Error generating image for deal {deal_id}: {str(e)}")
        return None, None, None

OpenAI client initialized with API key: sk-pr...


## Asynchronous Processing

Process multiple deals in parallel using async

In [10]:
async def process_deals_async(deals_df, max_workers=4, 
                              deal_id_col='deal_id',
                              original_image_id_col='original_image_id'):
    """
    Process multiple deals asynchronously using the integrated image generation function
    """
    results = []
    
    # Display column names to help debugging
    print(f"Available columns in dataframe: {list(deals_df.columns)}")
    
    # Create a temporary directory for image files
    with tempfile.TemporaryDirectory() as temp_dir:
        # Use ThreadPoolExecutor for parallelization
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Create tasks for all deals
            futures = []
            for idx, row in deals_df.iterrows():
                # Get deal_id and original_id, with fallbacks if columns don't exist
                deal_id = row.get(deal_id_col)
                if deal_id is None and 'deal_voucher_id' in row:
                    deal_id = row['deal_voucher_id']
                elif deal_id is None and 'id' in row:
                    deal_id = row['id']
                
                original_id = row.get(original_image_id_col)
                if original_id is None and 'image_id' in row:
                    original_id = row['image_id']
                elif original_id is None:
                    original_id = "main"  # Fallback if no image id is found
                    
                if deal_id is None:
                    print(f"Warning: Could not find deal ID in row: {row}")
                    continue
                    
                future = executor.submit(generate_image_integrated, deal_id, original_id, temp_dir)
                futures.append((future, deal_id, original_id, row))
            
            # Process results as they complete
            for future, deal_id, original_id, row in futures:
                try:
                    image_path, extension, token_info = future.result()
                    
                    if image_path:
                        # Read the image file
                        with open(image_path, 'rb') as img_file:
                            img_content = img_file.read()
                        
                        # Upload to S3 with correct extension
                        s3_key = f"images/deal/{deal_id}/{original_id}_variant.{extension}"
                        s3_url = upload_to_s3(img_content, 'your-s3-bucket-name', s3_key)
                        
                        # Add to results
                        result_row = row.to_dict()
                        result_row.update({
                            'status': 'success',
                            's3_url': s3_url,
                            'token_info': token_info,
                            'extension': extension,
                            'processed_timestamp': pd.Timestamp.now()
                        })
                        results.append(result_row)
                    else:
                        # Add failure to results
                        result_row = row.to_dict()
                        result_row.update({
                            'status': 'failed',
                            'error': 'Image generation failed',
                            'processed_timestamp': pd.Timestamp.now()
                        })
                        results.append(result_row)
                        
                except Exception as e:
                    print(f"Error processing deal {deal_id}: {str(e)}")
                    result_row = row.to_dict()
                    result_row.update({
                        'status': 'failed',
                        'error': str(e),
                        'processed_timestamp': pd.Timestamp.now()
                    })
                    results.append(result_row)
    
    return pd.DataFrame(results)

## Run the Process

Execute the async processing and display results

In [11]:
deals_df_tail = deals_df.tail(25)

# Process deals asynchronously
results_df = await process_deals_async(deals_df_tail, max_workers=5)

# Display results
results_df[['deal_voucher_id', 'original_image_id', 'status', 's3_url', 'extension', 'processed_timestamp']].head()


Available columns in dataframe: ['id', 'email_subject', 'category_name', 'sub_category_name', 'revenue_last_14_days', 'revenue_rank', 'image_id_pos_0', 'image_url_pos_0', 'extension']
Processing deal 39832493 with original image main
Processing deal 28773789 with original image main
Processing deal 37564382 with original image main
Processing deal 32223669 with original image main
Processing deal 30021183 with original image main
Found 10 images. Downloading up to 16 images...Found 2 images. Downloading up to 16 images...

Found 6 images. Downloading up to 16 images...
Found 8 images. Downloading up to 16 images...
Found 5 images. Downloading up to 16 images...
Calling OpenAI API to edit images...
Calling OpenAI API to edit images...
Calling OpenAI API to edit images...
Calling OpenAI API to edit images...
Calling OpenAI API to edit images...
Received response from OpenAI API. Decoding and saving image...
Saved generated image to /var/folders/dk/zpttw0wn0192zryb4bdz19lr0000gn/T/tmpgst7

KeyError: "['deal_voucher_id', 'original_image_id', 's3_url'] not in index"

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

## Analyze Results

Calculate statistics and costs across the batch

In [None]:
# Summarize processing results
success_count = results_df[results_df['status'] == 'success'].shape[0]
failed_count = results_df[results_df['status'] == 'failed'].shape[0]
total_count = len(results_df)

print(f"Processing Summary:")
print(f"Total deals: {total_count}")
print(f"Successful: {success_count} ({success_count/total_count*100:.1f}%)")
print(f"Failed: {failed_count} ({failed_count/total_count*100:.1f}%)")

# Calculate total cost 
total_cost = 0.0
for idx, row in results_df.iterrows():
    if row['status'] == 'success' and 'token_info' in row and row['token_info'] and 'Cost' in row['token_info']:
        cost_str = row['token_info']['Cost'].replace('$', '')
        try:
            cost = float(cost_str)
            total_cost += cost
        except:
            pass

print(f"\nTotal estimated cost for this batch: ${total_cost:.4f}")