# Imports

In [87]:
import openai
import json
import hashlib
import os
import re

# OpenAI Configuration

In [88]:
# Check OpenAI version for compatibility
OPENAI_VERSION = openai.__version__
from openai import RateLimitError, AuthenticationError, OpenAI

# Set your OpenAI API key here
openai.api_key = "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"

# Set the model to use
model = "gpt-4.1-nano"

# EmailChainGenerator

In [96]:
class EmailChainGenerator:
    """
    A generator for producing synthetic email threads between commercial real estate brokers
    and prospective tenants in Manhattan with enhanced uniqueness and diversity.
    """
    def __init__(self, prompt, num_chains=1, model="gpt-4.1-nano", dry_run=False):
        """
        Initializes the EmailChainGenerator.

        Args:
            prompt (str): The prompt to send to the OpenAI model.
            num_chains (int): Number of email chains to generate.
            model (str): The OpenAI model to use.
            dry_run (bool): If True, use a local JSON file instead of calling the API.
        """
        self.prompt = prompt
        self.num_chains = num_chains
        self.dataset = []
        self.dry_run = dry_run
        self.model = model
        self.client = OpenAI(api_key=openai.api_key)
        self.used_names = self.load_used_names()
        self.used_contexts = self.load_used_contexts()
        self.industry_variations = [
            "Tech", "Finance", "Legal", "Healthcare", "Nonprofit", "Retail", "Creative", "Education", "Hospitality"
        ]
        self.neighborhoods = [
            "Flatiron", "Tribeca", "Hudson Yards", "East Village", "Grand Central", "Chelsea", "SoHo", "Midtown East", "Financial District"
        ]
        self.personalities = [
            "Enthusiastic", "Skeptical", "Detail-obsessed", "Price-sensitive", "Decisive", "Hesitant"
        ]

    def load_used_names(self):
        """Load used names and companies from a file to ensure uniqueness."""
        try:
            with open("used_names.json", "r") as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            return {"names": [], "companies": [], "emails": []}

    def save_used_names(self):
        """Save used names, companies, and emails to a file atomically."""
        try:
            temp_file = "used_names_temp.json"
            with open(temp_file, "w") as f:
                json.dump(self.used_names, f, indent=2)
            os.replace(temp_file, "used_names.json")
        except Exception as e:
            print(f"Error saving used names: {e}")

    def load_used_contexts(self):
        """Load used context hashes to ensure unique situations."""
        try:
            with open("used_contexts.json", "r") as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            return []

    def save_used_contexts(self):
        """Save used context hashes to a file atomically."""
        try:
            temp_file = "used_contexts_temp.json"
            with open(temp_file, "w") as f:
                json.dump(self.used_contexts, f, indent=2)
            os.replace(temp_file, "used_contexts.json")
        except Exception as e:
            print(f"Error saving used contexts: {e}")

    def create_context_hash(self, tenant_profile):
        """Create a hash of key tenant profile fields to ensure unique contexts."""
        fields = [
            tenant_profile["Company Details"]["Industry"],
            tenant_profile["Company Details"]["Company Size"],
            tenant_profile["Company Details"]["Growth Stage"],
            tenant_profile["Property Preferences"]["Property Type"],
            tenant_profile["Property Preferences"]["Preferred Neighborhood"],
            tenant_profile["Property Preferences"]["Estimated or Stated Budget"],
            tenant_profile["Property Preferences"]["Space Size"],
            tenant_profile["Property Preferences"]["Preferred Lease Term"],
            tenant_profile["Moving Timeline"],
            tenant_profile["Outcome"],
            ",".join(sorted(tenant_profile["Pain Points"])),
            ",".join(sorted(tenant_profile["Property Preferences"]["Must-Haves"])),
            ",".join(sorted(tenant_profile["Property Preferences"]["Nice-to-Haves"])),
            tenant_profile["Decision-Maker Role"],
            tenant_profile.get("Tenant Personality", "Unknown")
        ]
        context_str = "|".join(str(field) for field in fields)
        return hashlib.sha256(context_str.encode()).hexdigest()

    def enhance_prompt(self, retry_count=0):
        """Enhance the prompt to enforce uniqueness and diversity."""
        used_names_str = ", ".join(self.used_names["names"]) if self.used_names["names"] else "none"
        used_companies_str = ", ".join(self.used_names["companies"]) if self.used_names["companies"] else "none"
        used_emails_str = ", ".join(self.used_names["emails"]) if self.used_names["emails"] else "none"
        context_guidance = (
            f"Generate a unique tenant profile with a distinct combination of:\n"
            f"- Industry: Choose from {', '.join(self.industry_variations)} or a novel industry.\n"
            f"- Neighborhood: Choose from {', '.join(self.neighborhoods)} or a new Manhattan area.\n"
            f"- Personality: Choose from {', '.join(self.personalities)}.\n"
            f"- Unique pain points, budget, timeline, and outcome.\n"
            f"Avoid any similarity to previous names, companies, emails, or contexts. "
            f"Create a new company name and representative name that do not resemble: {used_names_str}, {used_companies_str}, {used_emails_str}. "
            f"Ensure the email chain reflects realistic Manhattan CRE dynamics, with varied tenant personalities and broker tactics."
        )
        retry_instruction = (
            f"Generate a completely different context from previous attempts. "
            f"This is attempt {retry_count + 1}. Ensure valid JSON output with no syntax errors."
        ) if retry_count > 0 else ""
        enhanced_prompt = (
            self.prompt.replace("[DYNAMIC_LIST_NAMES]", used_names_str)
            .replace("[DYNAMIC_LIST_COMPANIES]", used_companies_str)
            .replace("[DYNAMIC_LIST_CONTEXTS]", "none")  # Contexts are handled via hash
            + f"\n\n{context_guidance}\n\n{retry_instruction}"
        )
        return enhanced_prompt

    def clean_response_text(self, text):
        """Clean API response to extract valid JSON."""
        text = re.sub(r'^```json\s*|\\s*```$', '', text, flags=re.MULTILINE)
        text = text.strip()
        if not text.startswith('{') or not text.endswith('}'):
            return None
        return text

    def generate_email_chain(self):
        """
        Generates a single email chain and corresponding tenant profile.

        Returns:
            tuple: (email_chain, tenant_profile) or (None, None) on error.
        """
        if self.dry_run:
            try:
                with open("dry_run.txt", "r") as f:
                    dry_run_txt = f.read()
                    parsed = json.loads(dry_run_txt)
                    email_chain = parsed.get("email_chain")
                    tenant_profile = parsed.get("tenant_profile")
                    return email_chain, tenant_profile
            except FileNotFoundError:
                print("Error: dry_run.txt file not found.")
                return None, None
            except json.JSONDecodeError as e:
                print(f"Error parsing dry_run.txt as JSON: {e}")
                return None, None

        max_retries = 10  # Increase retries for better uniqueness
        for retry in range(max_retries):
            try:
                prompt = self.enhance_prompt(retry)
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": "You are an email thread generator for real estate. Output only valid JSON with no additional text."},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.8,  # Increase temperature for more diversity
                    max_tokens=4000
                )
                response_text = response.choices[0].message.content
                cleaned_text = self.clean_response_text(response_text)
                if not cleaned_text:
                    print(f"Error: Invalid JSON structure in API response on attempt {retry + 1}.")
                    continue
                parsed = json.loads(cleaned_text)
                email_chain = parsed["email_chain"]
                tenant_profile = parsed["tenant_profile"]

                # Validate email chain structure
                if not isinstance(email_chain, list) or not all(
                    isinstance(email, dict) and all(key in email for key in ["timestamp", "from", "to", "subject", "body"])
                    for email in email_chain
                ):
                    print(f"Error: Invalid email chain structure or missing email bodies on attempt {retry + 1}.")
                    continue

                # Validate name, company, and email
                first_name = tenant_profile["Tenant Representative Details"]["First Name"]
                last_name = tenant_profile["Tenant Representative Details"]["Last Name"]
                company = tenant_profile["Company Details"]["Company Name"]
                email = tenant_profile["Tenant Representative Details"]["Email"]
                full_name = f"{first_name} {last_name}"
                if (full_name in self.used_names["names"] or
                    company in self.used_names["companies"] or
                    email in self.used_names["emails"]):
                    print(f"Error: Duplicate name '{full_name}', company '{company}', or email '{email}' detected on attempt {retry + 1}.")
                    continue

                # Validate context
                context_hash = self.create_context_hash(tenant_profile)
                if context_hash in self.used_contexts:
                    print(f"Error: Duplicate context detected on attempt {retry + 1}.")
                    continue

                # Add to used names, companies, emails, and contexts
                self.used_names["names"].append(full_name)
                self.used_names["companies"].append(company)
                self.used_names["emails"].append(email)
                self.used_contexts.append(context_hash)
                self.save_used_names()
                self.save_used_contexts()

                return email_chain, tenant_profile

            except (RateLimitError, AuthenticationError) as e:
                print(f"OpenAI API error: {e}")
                return None, None
            except json.JSONDecodeError as e:
                print(f"Error parsing API response as JSON: {e}")
                continue
            except Exception as e:
                print(f"Error generating email chain: {e}")
                return None, None

        print(f"Failed to generate unique chain after {max_retries} attempts.")
        return None, None

    def validate_data(self, email_chain, tenant_profile):
        """
        Validates the email chain and tenant profile.

        Args:
            email_chain: List of email dictionaries.
            tenant_profile: The tenant profile.

        Returns:
            bool: True if valid, False otherwise.
        """
        required_profile_keys = [
            "Tenant Representative Details", "Company Details", "Property Preferences",
            "First Interaction", "Last Interaction", "Moving Timeline", "Pain Points",
            "Urgency Score", "Outcome", "Tenant Personality"
        ]
        if not email_chain or not tenant_profile:
            return False
        if not all(key in tenant_profile for key in required_profile_keys):
            print("Warning: Missing required keys in tenant profile.")
            return False
        if not (2 <= len(email_chain) <= 8):
            print("Warning: Email chain length must be 2–8 emails.")
            return False
        for email in email_chain:
            if not email.get("body"):
                print("Warning: Email missing body.")
                return False
        return True

    def save_as_json(self, filename="synthetic_email_data.json"):
        """
        Appends the dataset to an existing JSON file or creates a new one.

        Args:
            filename (str): The file path where the dataset should be saved.
        """
        try:
            try:
                with open(filename, "r") as f:
                    existing_data = json.load(f)
            except (FileNotFoundError, json.JSONDecodeError):
                existing_data = []

            existing_data.extend(self.dataset)

            with open(filename, "w") as f:
                json.dump(existing_data, f, indent=2)
            print(f"Appended {len(self.dataset)} chains to {filename}. Total chains: {len(existing_data)}")
        except Exception as e:
            print(f"Error saving data to JSON: {e}")

# Iterations

In [90]:
def load_prompt():
    """Load prompt from prompt.txt."""
    try:
        with open("prompt.txt", "r") as f:
            prompt = f.read()
        return prompt
    except FileNotFoundError:
        print("Error: prompt.txt not found.")
        raise

# Load prompt
prompt = load_prompt()
print("Prompt successfully loaded.")

Prompt successfully loaded.


In [104]:
# Configure generation
total_chains = 200  # Target number of chains
num_chains_per_batch = 1  # Generate one chain at a time
output_file = "synthetic_email_data.json"

# Load existing dataset to check current chain count
try:
    with open(output_file, "r") as f:
        existing_data = json.load(f)
    current_chains = len(existing_data)
except (FileNotFoundError, json.JSONDecodeError):
    existing_data = []
    current_chains = 0  # Resume from chain 13

chains_to_generate = max(0, total_chains - current_chains)
print(f"Current chains: {current_chains}. Need to generate: {chains_to_generate}")

Current chains: 200. Need to generate: 0


In [105]:
# Generate chains one at a time
for i in range(chains_to_generate):
    print(f"Generating chain {current_chains + i + 1}/{total_chains}...")
    generator = EmailChainGenerator(
        prompt=prompt,
        num_chains=num_chains_per_batch,
        model="gpt-4.1-nano",
        dry_run=False
    )
    # Generate one chain
    email_chain, tenant_profile = generator.generate_email_chain()

    if email_chain is not None and generator.validate_data(email_chain, tenant_profile):
        generator.dataset.append({
            "email_chain": email_chain,
            "tenant_profile": tenant_profile
        })
        print(f"Successfully added chain {current_chains + i + 1}")
        # Append immediately to JSON
        generator.save_as_json(output_file)
    else:
        print(f"Failed to add chain {current_chains + i + 1}")

print(f"Generation complete. Total chains in {output_file}: {current_chains + len(generator.dataset)}")

Generation complete. Total chains in synthetic_email_data.json: 200
