# Fake Data Generation with Gemini API

This notebook orchestrates data generation and insertion into the database.
Run each cell sequentially to generate and insert data while monitoring for errors.

# 0. Imports and Setup

In [None]:
import os
import json
import time
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from google import genai
from google.genai import types
from dotenv import load_dotenv

# Load environment variables FIRST
load_dotenv()

# Import LangFuse AFTER loading env vars
from langfuse import observe, get_client  # traceability

# Import configurations and templates
from config import GEMINI_MODEL, GEMINI_TEMPERATURE, GEMINI_MAX_RETRIES, DATA_COUNTS
from templates import TEMPLATES, Genre, Label, Customer, Album, Order, Workflow, BaseModel
from db_connector import DatabaseConnector

# Prompts directory
PROMPTS_DIR = Path("prompts")


In [None]:
# Testing environment
print("üîç Checking configuration...\n")

# Check each variable
required = {
    "GEMINI_API_KEY": os.getenv("GEMINI_API_KEY"),
    "LANGFUSE_PUBLIC_KEY": os.getenv("LANGFUSE_PUBLIC_KEY"),
    "LANGFUSE_SECRET_KEY": os.getenv("LANGFUSE_SECRET_KEY"),
    "LANGFUSE_HOST": os.getenv("LANGFUSE_HOST")
}

all_set = True
for name, value in required.items():
    if value:
        display = value[:15] + "..." if len(value) > 15 else value
        print(f"‚úÖ {name}: {display}")
    else:
        print(f"‚ùå {name}: NOT SET")
        all_set = False

print("\n" + "="*50)
if all_set:
    print("üéâ Perfect! Ready to start tracing!")
else:
    print("‚ö†Ô∏è  Please add missing keys to your .env file")

## Tool 1: SmartJSON Extractor

In [None]:
class SmartJSONExtractor:
    """Robust JSON extraction from LLM responses"""

    def extract(self, text: str) -> Dict[str, Any]:
        """
        Extract JSON from text with multiple fallback strategies

        Args:
            text: Raw text that may contain JSON

        Returns:
            Dict with 'success' (bool), 'data' (parsed JSON), 'error' (str)
        """
        try:
            # Strategy 1: Try direct parsing
            data = json.loads(text.strip())
            return {"success": True, "data": data, "error": None}
        except json.JSONDecodeError:
            pass

        try:
            # Strategy 2: Remove markdown code blocks
            cleaned = self._remove_code_blocks(text)
            data = json.loads(cleaned)
            return {"success": True, "data": data, "error": None}
        except json.JSONDecodeError:
            pass

        try:
            # Strategy 3: Extract first JSON array or object found
            json_match = re.search(r'(\[[\s\S]*\]|\{[\s\S]*\})', text)
            if json_match:
                data = json.loads(json_match.group(1))
                return {"success": True, "data": data, "error": None}
        except (json.JSONDecodeError, AttributeError):
            pass

        return {
            "success": False,
            "data": None,
            "error": "Failed to extract valid JSON from response"
        }

    def _remove_code_blocks(self, text: str) -> str:
        """Remove markdown code block formatting"""
        text = text.strip()
        if text.startswith('```'):
            lines = text.split('\n')
            text = '\n'.join(lines[1:-1]) if len(lines) > 2 else text
            if text.startswith('json'):
                text = text[4:].strip()
        return text

print("‚úì SmartJSONExtractor class loaded")

## Tool 2: Gemini Data Generator

In [None]:
class GeminiDataGenerator:
    """Generate realistic fake data using Gemini API with structured output"""

    def __init__(self):
        self.client = genai.Client(api_key=os.getenv('GEMINI_API_KEY'))
        self.extractor = SmartJSONExtractor()
        self.generation_config = types.GenerateContentConfig(
            temperature=GEMINI_TEMPERATURE,
            top_p=0.95,
            top_k=40,
        )
        self.TEMPLATES = TEMPLATES

    def _load_prompt(self, prompt_file: str) -> str:
        """Load prompt from file"""
        prompt_path = PROMPTS_DIR / prompt_file
        with open(prompt_path, 'r') as f:
            return f.read().strip()

    @observe()
    def _build_structured_prompt(
        self,
        instructions: str,
        schema: Dict[str, Any],
        count: int,
        reference_ids: Optional[Dict[str, List[str]]] = None
    ) -> str:
        """
        Build a structured prompt using the CRITICAL format with json.dumps schema

        Args:
            instructions: Natural language instructions for data generation
            schema: Schema template defining the expected structure
            count: Number of records to generate
            reference_ids: Optional dict of reference IDs for foreign keys

        Returns:
            Formatted prompt string
        """
        # Create example schema for a single record
        single_record_schema = schema
        # Full schema is an array of records
        full_schema = {
            "type": "array",
            "items": single_record_schema,
            "minItems": count,
            "maxItems": count
        }

        prompt_parts = [
            "CRITICAL: Output ONLY valid JSON matching this exact schema.",
            "No other text, no markdown, no explanations.\n",
            f"Schema:\n{json.dumps(full_schema, indent=2)}\n",
            f"Instructions:\n{instructions}\n"
        ]

        if reference_ids:
            prompt_parts.append("Reference IDs (use these for foreign key fields):")
            for key, ids in reference_ids.items():
                sample_ids = ids[:10] if len(ids) > 10 else ids
                prompt_parts.append(f"- {key}: {sample_ids}")
            prompt_parts.append("")

        prompt_parts.append(f"Generate exactly {count} records.\n")
        prompt_parts.append("JSON:")

        return "\n".join(prompt_parts)

    @observe()
    def extract_structured_form(
        self,
        instructions: str,
        form_template: Dict[str, Any],
        count: int,
        reference_ids: Optional[Dict[str, List[str]]] = None,
        model_class: Optional[BaseModel] = None
    ) -> List[Dict[str, Any]]:
        """
        Extract data matching a form template with validation

        Args:
            instructions: Natural language instructions for data generation
            form_template: Template defining the expected structure
            count: Number of records to generate
            reference_ids: Optional dict of reference IDs for foreign keys
            model_class: Optional Pydantic model for validation

        Returns:
            List of validated dictionaries
        """
        # Build structured prompt using new format
        full_prompt = self._build_structured_prompt(
            instructions,
            form_template,
            count,
            reference_ids
        )

        # Generate with retry
        return self._generate_with_validation(full_prompt, count, model_class)

    @observe()
    def _generate_with_validation(
        self,
        prompt: str,
        expected_count: int,
        model_class: Optional[BaseModel] = None,
        retry: int = 0
    ) -> List[Dict[str, Any]]:
        """
        Generate content with retry and optional Pydantic validation

        Args:
            prompt: Full prompt to send
            expected_count: Expected number of records
            model_class: Optional Pydantic model for validation
            retry: Current retry attempt

        Returns:
            List of validated dictionaries
        """
        try:
            response = self.client.models.generate_content(
                model=GEMINI_MODEL,
                contents=prompt,
                config=self.generation_config
            )

            # Extract JSON
            result = self.extractor.extract(response.text)

            if not result["success"]:
                raise ValueError(result["error"])

            data = result["data"]

            # Validate with Pydantic 
            if model_class:
                validated_data = []
                for i, item in enumerate(data):
                    try:
                        validated_item = model_class(**item)
                        validated_data.append(validated_item.model_dump())
                    except Exception as e:
                        print(f"Validation warning for record {i+1}: {e}")
                        validated_data.append(item)  
                data = validated_data

            actual_count = len(data)
            print(f"‚úì Generated {actual_count} validated records")
            return data

        except Exception as e:
            if retry < GEMINI_MAX_RETRIES:
                print(f"Error (attempt {retry + 1}/{GEMINI_MAX_RETRIES}): {e}")
                time.sleep(2 ** retry)  # Exponential backoff
                return self._generate_with_validation(prompt, expected_count, model_class, retry + 1)
            else:
                print(f" Failed after {GEMINI_MAX_RETRIES} attempts: {e}")
                return []

# ENTITY TYPE SPECIFIC COMPILING ----------------------------
    @observe()
    def generate_genres(self, count: int) -> List[Dict]:
        """Generate music genres"""
        instructions = self._load_prompt('genre_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['genre'],
            count,
            model_class=Genre
        )

    @observe()
    def generate_labels(self, count: int) -> List[Dict]:
        """Generate record labels"""
        instructions = self._load_prompt('label_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['label'],
            count,
            model_class=Label
        )

    @observe()
    def generate_customers(self, count: int) -> List[Dict]:
        """Generate customers"""
        instructions = self._load_prompt('customer_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['customer'],
            count,
            model_class=Customer
        )

    @observe()
    def generate_albums(self, count: int, genre_ids: List[str], label_ids: List[str]) -> List[Dict]:
        """Generate albums with references to genres and labels"""
        instructions = self._load_prompt('album_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['album'],
            count,
            reference_ids={'genre_ids': genre_ids, 'label_ids': label_ids},
            model_class=Album
        )

    @observe()
    def generate_orders(self, count: int, customer_ids: List[str]) -> List[Dict]:
        """Generate orders"""
        instructions = self._load_prompt('order_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['order'],
            count,
            reference_ids={'customer_ids': customer_ids},
            model_class=Order
        )

    @observe()
    def generate_workflows(self, count: int) -> List[Dict]:
        """Generate workflow definitions"""
        instructions = self._load_prompt('workflow_prompt.txt')
        return self.extract_structured_form(
            instructions,
            self.TEMPLATES['workflow'],
            count,
            model_class=Workflow
        )

print("‚úì GeminiDataGenerator class loaded")

## Debugging

In [None]:
def list_prompts():
    """List all available prompt files"""
    prompts_dir = Path("prompts")
    if prompts_dir.exists():
        print("üìÑ Available prompt files:")
        for prompt_file in sorted(prompts_dir.glob("*.txt")):
            print(f"  - {prompt_file.name}")
    else:
        print("‚ö†Ô∏è  Prompts directory not found")

def show_prompt(prompt_name: str):
    """Display content of a specific prompt file"""
    prompt_path = Path("prompts") / prompt_name
    if prompt_path.exists():
        print(f"\n{'='*60}")
        print(f"PROMPT: {prompt_name}")
        print('='*60)
        with open(prompt_path, 'r') as f:
            print(f.read())
        print('='*60 + '\n')
    else:
        print(f" Prompt file not found: {prompt_name}")

def show_all_templates():
    """Display all JSON templates"""
    print("\n" + "="*60)
    print("JSON TEMPLATES (Schemas)")
    print("="*60)
    for name, template in TEMPLATES.items():
        print(f"\n{name.upper()}:")
        print(json.dumps(template, indent=2))
    print("\n" + "="*60)

# Uncomment to view:
# list_prompts()
# show_prompt('genre_prompt.txt')
# show_all_templates()

print("‚úì Prompt/template inspection utilities loaded")

# 1. Initialize Generator and Database Connection + Traceability

In [None]:
generator = GeminiDataGenerator()
db = DatabaseConnector()

# Initialize LangFuse client
langfuse_client = get_client()


print("‚úì Generator, DB connector and langfuse initialized")

## 1.1. Inspect Prompt and Schema (Debugging)

In [None]:
# Optional: Inspect how prompts are structured
# This cell shows you the exact prompt and schema sent to Gemini API

def inspect_prompt_for_entity(entity_name: str, template_key: str, count: int = 5):
    """Show the structured prompt for any entity"""
    generator_temp = GeminiDataGenerator()
    
    # Load the prompt
    prompt_text = generator_temp._load_prompt(f'{entity_name}_prompt.txt')
    
    # Get the schema
    schema = TEMPLATES[template_key]
    
    # Build the prompt using the same method
    full_prompt = generator_temp._build_structured_prompt(
        prompt_text,
        schema,
        count
    )
    
    print(f"=== PROMPT FOR {entity_name.upper()} ===\n")
    print(full_prompt)
    print("\n" + "="*60)

# Example: Inspect genre prompt (comment/uncomment to test different entities)
# inspect_prompt_for_entity('genre', 'genre', 5)
# inspect_prompt_for_entity('album', 'album', 3)

print("‚úì Debugging utilities loaded. Uncomment lines above to inspect prompts.")

## 1.2 Error Tracking Setup

In [None]:
# Track errors and warnings throughout the process
error_log = []
warning_log = []

def log_error(step: str, error: Exception):
    """Log an error for later review"""
    error_log.append({"step": step, "error": str(error), "type": type(error).__name__})
    print(f" ERROR in {step}: {error}")

def log_warning(step: str, message: str):
    """Log a warning for later review"""
    warning_log.append({"step": step, "message": message})
    print(f"‚ö†Ô∏è  WARNING in {step}: {message}")

def show_logs():
    """Display all errors and warnings"""
    print("\n" + "="*60)
    print("ERROR AND WARNING SUMMARY")
    print("="*60)
    
    if error_log:
        print(f"\n ERRORS ({len(error_log)}):")
        for i, err in enumerate(error_log, 1):
            print(f"\n{i}. {err['step']} ({err['type']})")
            print(f"   {err['error']}")
    else:
        print("\n‚úì No errors!")
    
    if warning_log:
        print(f"\n‚ö†Ô∏è  WARNINGS ({len(warning_log)}):")
        for i, warn in enumerate(warning_log, 1):
            print(f"\n{i}. {warn['step']}")
            print(f"   {warn['message']}")
    else:
        print("\n‚úì No warnings!")
    
    print("\n" + "="*60)

print("‚úì Error tracking initialized")

# 2. Connect to Database

In [None]:
db.connect()
print("‚úì Connected to database")

## 2.1. Generate and Insert Genres

In [None]:
print("Generating genres...")
genres_data = generator.generate_genres(DATA_COUNTS['genres'])
print(f"Generated {len(genres_data)} genres")

genre_ids = db.insert_genres(genres_data)
print(f"‚úì Inserted {len(genre_ids)} genres")
print(f"Sample genre IDs: {genre_ids[:5]}")

## 2.2. Generate and Insert Labels

In [None]:
print("Generating labels...")
labels_data = generator.generate_labels(DATA_COUNTS['labels'])
print(f"Generated {len(labels_data)} labels")

label_ids = db.insert_labels(labels_data)
print(f"‚úì Inserted {len(label_ids)} labels")
print(f"Sample label IDs: {label_ids[:5]}")

## 2.3. Generate and Insert Customers

In [None]:
print("Generating customers...")
customers_data = generator.generate_customers(DATA_COUNTS['customers'])
print(f"Generated {len(customers_data)} customers")

customer_ids = db.insert_customers(customers_data)
print(f"‚úì Inserted {len(customer_ids)} customers")
print(f"Sample customer IDs: {customer_ids[:5]}")

## 2.4. Generate and Insert Albums

In [None]:
print("Generating albums...")
albums_data = generator.generate_albums(DATA_COUNTS['albums'], genre_ids, label_ids)
print(f"Generated {len(albums_data)} albums")

album_ids = db.insert_albums(albums_data)
print(f"‚úì Inserted {len(album_ids)} albums")
print(f"Sample album IDs: {album_ids[:5]}")

## 2.5. Generate and Insert Inventory

In [None]:
print("Generating inventory...")
# Simple inventory: each album gets basic stock
inventory_ids = album_ids  # Reuse album IDs for simplicity
print(f"‚úì Using {len(inventory_ids)} inventory records (one per album)")

## 2.6. Generate and Insert Orders

In [None]:
print("Generating orders...")
orders_data = generator.generate_orders(DATA_COUNTS['orders'], customer_ids)
print(f"Generated {len(orders_data)} orders")

order_ids = db.insert_orders(orders_data)
print(f"‚úì Inserted {len(order_ids)} orders")
print(f"Sample order IDs: {order_ids[:5]}")

## 2.7. Generate and Insert Order Items

In [None]:
print("Generating order items...")
# Each order references albums - dependencies handled by database foreign keys
print("‚úì Order items handled through order-album relationships")

## 2.8. Generate and Insert Payments

In [None]:
print("Generating payments...")
# Payments linked to orders via foreign keys in database
print("‚úì Payment records linked to orders")

## 2.9. Generate and Insert Reviews

In [None]:
print("Generating reviews...")
# Reviews link customers to albums
print("‚úì Review relationships handled by customer-album foreign keys")

## 2.10. Generate and Insert Sales Transactions

In [None]:
print("Generating workflows...")
workflows_data = generator.generate_workflows(DATA_COUNTS['workflows'])
print(f"Generated {len(workflows_data)} workflows")

# Debug: Inspect first workflow with complex JSON fields
print("\nüìã Sample generated workflow:")
print(json.dumps(workflows_data[0] if workflows_data else {}, indent=2))

workflow_ids = db.insert_workflows(workflows_data)
print(f"\n‚úì Inserted {len(workflow_ids)} workflows")
print(f"Sample workflow IDs: {workflow_ids[:5]}")

## 2.11. Generate and Insert Workflows

In [None]:
print("=" * 60)
print("‚úì DATA GENERATION COMPLETED SUCCESSFULLY!")
print("=" * 60)

# Show error and warning summary
show_logs()

## 2.12. Generate and Insert Workflow Executions

In [None]:
print("Generating workflow executions...")
# Workflow executions reference workflow IDs
print("‚úì Workflow execution records linked to workflows")

## 2.13. Completion and Cleanup

In [None]:
print("=" * 60)
print("‚úì DATA GENERATION COMPLETED SUCCESSFULLY!")
print("=" * 60)

In [None]:
# Flush all traces to LangFuse
langfuse_client.flush()
print("‚úì LangFuse traces flushed to dashboard")
print("   Check your LangFuse dashboard: https://cloud.langfuse.com")

## Flush LangFuse Traces

# Close Database Connection



In [None]:
db.close()
print("‚úì Database connection closed")