# OpenAI Sentiment Analysis for Airbnb Reviews

## Overview

This notebook demonstrates sentiment analysis of Airbnb review comments using OpenAI's Large Language Models (LLMs). The solution uses prompt engineering techniques to classify reviews as positive (1), neutral (0), or negative (-1).

## Table of Contents

1. [Setup and Dependencies](#setup)
2. [Configuration](#configuration)
3. [Data Loading](#data-loading)
4. [Prompt Engineering](#prompt-engineering)
5. [Sentiment Prediction](#sentiment-prediction)
6. [Results Analysis](#results-analysis)
7. [Cost Tracking](#cost-tracking)
8. [Export Results](#export-results)
9. [Summary](#summary)

## Prerequisites

- OpenAI API key
- Python 3.11+
- Required packages: openai, pandas, numpy, matplotlib, python-dotenv

## Features

- **Flexible Model Selection**: Support for GPT-4, GPT-3.5-turbo, and other OpenAI models
- **Robust Error Handling**: Automatic retry with exponential backoff for rate limits
- **Cost Tracking**: Monitor token usage and estimate API costs
- **Batch Processing**: Efficiently process multiple reviews with progress tracking
- **Results Export**: Save predictions to CSV for downstream analysis
- **Visualization**: Charts and metrics for sentiment distribution

---
## 1. Setup and Dependencies <a id='setup'></a>

Install required packages and import necessary libraries.

In [None]:
# Install required dependencies
# Uncomment the following line to install packages
# !pip install openai pandas numpy matplotlib python-dotenv hypothesis pytest

In [None]:
# Import standard libraries
import os
import sys
import time
import logging
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
from pathlib import Path
import json
import getpass

# Import data processing libraries
import pandas as pd
import numpy as np

# Import visualization libraries
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

# Import OpenAI
from openai import OpenAI
from openai import OpenAIError, RateLimitError, APIError, APIConnectionError, Timeout

# Import environment variable management
from dotenv import load_dotenv

# Configure matplotlib
plt.style.use('seaborn-v0_8-darkgrid')
%matplotlib inline

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 100)
pd.set_option('display.width', None)

### Configure Logging

Set up logging to track API calls, errors, and processing progress.

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('openai_sentiment_analysis.log'),
        logging.StreamHandler(sys.stdout)
    ]
)

logger = logging.getLogger(__name__)
logger.info("OpenAI Sentiment Analysis Notebook initialized")
logger.info(f"Notebook started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

### Load Environment Variables

Load API keys and configuration from environment variables.

In [None]:
# Load environment variables from .env file
load_dotenv()

logger.info("Environment variables loaded successfully")

### Verify Installation

Check that all required packages are installed and accessible.

In [None]:
# Verify package versions
import openai

print("Package Versions:")
print(f"  OpenAI: {openai.__version__}")
print(f"  Pandas: {pd.__version__}")
print(f"  NumPy: {np.__version__}")
print(f"  Python: {sys.version}")

logger.info("All required packages verified")

---
## 2. Configuration <a id='configuration'></a>

Configure the OpenAI API client and set analysis parameters.

### Configuration Dataclass

Define a configuration dataclass to manage all parameters for the sentiment analysis pipeline.

In [None]:
@dataclass
class Config:
    """Configuration for OpenAI sentiment analysis.
    
    This dataclass centralizes all configuration parameters including API credentials,
    model settings, processing options, and file paths.
    
    Attributes:
        api_key: OpenAI API key for authentication
        model_name: Name of the OpenAI model to use (e.g., 'gpt-3.5-turbo', 'gpt-4')
        temperature: Sampling temperature (0.0 for deterministic, higher for creative)
        max_tokens: Maximum tokens in model response
        batch_size: Number of reviews to process in each batch
        rate_limit_delay: Delay in seconds between API requests
        max_reviews: Maximum number of reviews to process (None for all)
        data_file: Path to input CSV file with reviews
        output_dir: Directory for saving results
        max_retries: Maximum number of retry attempts for failed requests
        backoff_factor: Multiplier for exponential backoff delays
    """
    
    # API Configuration
    api_key: str
    model_name: str = "gpt-3.5-turbo"
    
    # Model Parameters
    temperature: float = 0.0  # Deterministic for consistency
    max_tokens: int = 10      # Short responses for sentiment labels
    
    # Processing Configuration
    batch_size: int = 100
    max_reviews: Optional[int] = None
    rate_limit_delay: float = 0.5  # Seconds between requests
    
    # File Configuration
    data_file: str = "reviews/paris-2015-09-02-reviews.csv"
    output_dir: str = "results/"
    
    # Retry Configuration
    max_retries: int = 3
    backoff_factor: float = 2.0
    
    def __post_init__(self):
        """Validate configuration parameters after initialization."""
        self._validate()
    
    def _validate(self) -> None:
        """Validate configuration parameters.
        
        Raises:
            ValueError: If any parameter is invalid
        """
        # Validate API key
        if not self.api_key or not isinstance(self.api_key, str):
            raise ValueError("API key must be a non-empty string")
        
        if len(self.api_key.strip()) == 0:
            raise ValueError("API key cannot be empty or whitespace")
        
        # Validate model name
        if not self.model_name or not isinstance(self.model_name, str):
            raise ValueError("Model name must be a non-empty string")
        
        # Validate temperature
        if not isinstance(self.temperature, (int, float)):
            raise ValueError("Temperature must be a number")
        
        if not 0.0 <= self.temperature <= 2.0:
            raise ValueError("Temperature must be between 0.0 and 2.0")
        
        # Validate max_tokens
        if not isinstance(self.max_tokens, int):
            raise ValueError("max_tokens must be an integer")
        
        if self.max_tokens <= 0:
            raise ValueError("max_tokens must be positive")
        
        # Validate batch_size
        if not isinstance(self.batch_size, int):
            raise ValueError("batch_size must be an integer")
        
        if self.batch_size <= 0:
            raise ValueError("batch_size must be positive")
        
        # Validate rate_limit_delay
        if not isinstance(self.rate_limit_delay, (int, float)):
            raise ValueError("rate_limit_delay must be a number")
        
        if self.rate_limit_delay < 0:
            raise ValueError("rate_limit_delay must be non-negative")
        
        # Validate max_reviews
        if self.max_reviews is not None:
            if not isinstance(self.max_reviews, int):
                raise ValueError("max_reviews must be an integer or None")
            
            if self.max_reviews <= 0:
                raise ValueError("max_reviews must be positive")
        
        # Validate data_file
        if not self.data_file or not isinstance(self.data_file, str):
            raise ValueError("data_file must be a non-empty string")
        
        # Validate output_dir
        if not self.output_dir or not isinstance(self.output_dir, str):
            raise ValueError("output_dir must be a non-empty string")
        
        # Validate max_retries
        if not isinstance(self.max_retries, int):
            raise ValueError("max_retries must be an integer")
        
        if self.max_retries < 0:
            raise ValueError("max_retries must be non-negative")
        
        # Validate backoff_factor
        if not isinstance(self.backoff_factor, (int, float)):
            raise ValueError("backoff_factor must be a number")
        
        if self.backoff_factor < 1.0:
            raise ValueError("backoff_factor must be >= 1.0")
        
        logger.info("Configuration validation passed")
    
    def display(self) -> None:
        """Display configuration with sensitive data masked.
        
        Prints all configuration parameters in a readable format,
        masking the API key for security.
        """
        print("\n" + "="*60)
        print("Configuration Settings")
        print("="*60)
        
        print("\nAPI Configuration:")
        print(f"  API Key: {self._mask_api_key(self.api_key)}")
        print(f"  Model Name: {self.model_name}")
        
        print("\nModel Parameters:")
        print(f"  Temperature: {self.temperature}")
        print(f"  Max Tokens: {self.max_tokens}")
        
        print("\nProcessing Configuration:")
        print(f"  Batch Size: {self.batch_size}")
        print(f"  Max Reviews: {self.max_reviews if self.max_reviews else 'All'}")
        print(f"  Rate Limit Delay: {self.rate_limit_delay}s")
        
        print("\nFile Configuration:")
        print(f"  Data File: {self.data_file}")
        print(f"  Output Directory: {self.output_dir}")
        
        print("\nRetry Configuration:")
        print(f"  Max Retries: {self.max_retries}")
        print(f"  Backoff Factor: {self.backoff_factor}")
        
        print("="*60 + "\n")
    
    @staticmethod
    def _mask_api_key(api_key: str) -> str:
        """Mask API key for secure display.
        
        Args:
            api_key: The API key to mask
        
        Returns:
            Masked API key showing only first 4 and last 4 characters
        """
        if len(api_key) <= 8:
            return "*" * len(api_key)
        
        return f"{api_key[:4]}...{api_key[-4:]}"
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert configuration to dictionary.
        
        Returns:
            Dictionary representation of configuration (with masked API key)
        """
        return {
            "api_key": self._mask_api_key(self.api_key),
            "model_name": self.model_name,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "batch_size": self.batch_size,
            "max_reviews": self.max_reviews,
            "rate_limit_delay": self.rate_limit_delay,
            "data_file": self.data_file,
            "output_dir": self.output_dir,
            "max_retries": self.max_retries,
            "backoff_factor": self.backoff_factor
        }

logger.info("Config dataclass defined successfully")

### Secure API Key Input

Securely obtain the OpenAI API key from environment variables or user input.

In [None]:
def get_api_key() -> str:
    """Securely obtain OpenAI API key.
    
    Attempts to load API key from environment variable first.
    If not found, prompts user for secure input.
    
    Returns:
        OpenAI API key
    
    Raises:
        ValueError: If API key is not provided or is empty
    """
    # Try to get from environment variable
    api_key = os.getenv("OPENAI_API_KEY")
    
    if api_key and len(api_key.strip()) > 0:
        logger.info("API key loaded from environment variable")
        return api_key.strip()
    
    # If not in environment, prompt user
    logger.info("API key not found in environment, prompting user")
    print("\nOpenAI API key not found in environment variables.")
    print("Please enter your OpenAI API key (input will be hidden):")
    
    api_key = getpass.getpass("API Key: ")
    
    if not api_key or len(api_key.strip()) == 0:
        raise ValueError("API key cannot be empty")
    
    logger.info("API key obtained from user input")
    return api_key.strip()

logger.info("API key input function defined")

### Initialize Configuration

Create a configuration instance with default or custom parameters.

In [None]:
# Get API key securely
api_key = get_api_key()

# Create configuration with default settings
config = Config(
    api_key=api_key,
    model_name="gpt-3.5-turbo",  # Use GPT-3.5-turbo for cost efficiency
    temperature=0.0,              # Deterministic responses for consistency
    max_tokens=10,                # Short responses (just sentiment label)
    batch_size=100,               # Process 100 reviews at a time
    rate_limit_delay=0.5,         # 0.5 second delay between requests
    max_reviews=None,             # Process all reviews (set to number for testing)
    data_file="paris-2015-09-02-reviews.csv",  # Default to Paris dataset
    output_dir="results/",        # Save results to results directory
    max_retries=3,                # Retry failed requests up to 3 times
    backoff_factor=2.0            # Double delay on each retry
)

# Display configuration (with masked API key)
config.display()

logger.info("Configuration initialized successfully")

### Configuration Notes

**Model Selection:**
- `gpt-3.5-turbo`: Fast and cost-effective for sentiment analysis
- `gpt-4`: More accurate but significantly more expensive

**Temperature:**
- Set to 0.0 for deterministic, consistent sentiment predictions
- Higher values (0.5-1.0) introduce randomness

**Rate Limiting:**
- Adjust `rate_limit_delay` based on your API tier
- Free tier: 3 requests/minute â†’ use 20+ second delay
- Paid tier: Higher limits â†’ can reduce delay

**Testing:**
- Set `max_reviews` to a small number (e.g., 50) for initial testing
- Set to `None` to process entire dataset

**Cost Optimization:**
- Use `gpt-3.5-turbo` instead of `gpt-4` (10-20x cheaper)
- Keep `max_tokens` low (sentiment labels are short)
- Sample large datasets instead of processing all reviews

---
## 3. Data Loading <a id='data-loading'></a>

Load and preprocess Airbnb review data from CSV files.

### ReviewDataLoader Class

Create a class to handle loading and preprocessing of review data.

In [None]:
class ReviewDataLoader:
    """Loads and preprocesses Airbnb review data from CSV files.
    
    This class handles:
    - Loading CSV files with error handling
    - Extracting and filtering comment columns
    - Validating data structure
    - Providing sample records for inspection
    """
    
    def __init__(self):
        """Initialize the ReviewDataLoader."""
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info("ReviewDataLoader initialized")
    
    def load_reviews(self, file_path: str, max_rows: Optional[int] = None) -> pd.DataFrame:
        """Load reviews from CSV file.
        
        Args:
            file_path: Path to CSV file containing reviews
            max_rows: Maximum number of rows to load (None for all)
        
        Returns:
            DataFrame containing review data
        
        Raises:
            FileNotFoundError: If file does not exist
            ValueError: If file is empty or invalid
        """
        self.logger.info(f"Loading reviews from: {file_path}")
        
        # Check if file exists
        if not os.path.exists(file_path):
            error_msg = f"File not found: {file_path}"
            self.logger.error(error_msg)
            raise FileNotFoundError(error_msg)
        
        try:
            # Load CSV file
            if max_rows is not None:
                df = pd.read_csv(file_path, nrows=max_rows)
                self.logger.info(f"Loaded {len(df)} rows (limited to {max_rows})")
            else:
                df = pd.read_csv(file_path)
                self.logger.info(f"Loaded {len(df)} rows")
            
            # Check if DataFrame is empty
            if df.empty:
                error_msg = f"File is empty: {file_path}"
                self.logger.error(error_msg)
                raise ValueError(error_msg)
            
            # Validate data structure
            if not self.validate_data(df):
                error_msg = f"Invalid data structure in file: {file_path}"
                self.logger.error(error_msg)
                raise ValueError(error_msg)
            
            self.logger.info(f"Successfully loaded {len(df)} reviews")
            self.logger.info(f"Columns: {list(df.columns)}")
            
            return df
            
        except pd.errors.EmptyDataError:
            error_msg = f"File is empty or invalid: {file_path}"
            self.logger.error(error_msg)
            raise ValueError(error_msg)
        except pd.errors.ParserError as e:
            error_msg = f"Error parsing CSV file: {e}"
            self.logger.error(error_msg)
            raise ValueError(error_msg)
        except Exception as e:
            error_msg = f"Unexpected error loading file: {e}"
            self.logger.error(error_msg)
            raise
    
    def extract_comments(self, df: pd.DataFrame) -> List[str]:
        """Extract comment column and filter out missing values.
        
        Args:
            df: DataFrame containing review data
        
        Returns:
            List of non-null comment strings
        
        Raises:
            ValueError: If 'comments' column does not exist
        """
        self.logger.info("Extracting comments from DataFrame")
        
        # Check if comments column exists
        if 'comments' not in df.columns:
            error_msg = "'comments' column not found in DataFrame"
            self.logger.error(error_msg)
            raise ValueError(error_msg)
        
        # Get total count before filtering
        total_count = len(df)
        
        # Extract comments and filter null/empty values
        comments = df['comments'].dropna()
        
        # Convert to string and filter empty strings
        comments = comments.astype(str)
        comments = comments[comments.str.strip() != '']
        
        # Convert to list
        comment_list = comments.tolist()
        
        filtered_count = total_count - len(comment_list)
        self.logger.info(f"Extracted {len(comment_list)} comments")
        self.logger.info(f"Filtered out {filtered_count} null/empty comments")
        
        return comment_list
    
    def validate_data(self, df: pd.DataFrame) -> bool:
        """Validate that required columns exist in DataFrame.
        
        Args:
            df: DataFrame to validate
        
        Returns:
            True if data is valid, False otherwise
        """
        self.logger.info("Validating DataFrame structure")
        
        # Check if 'comments' column exists
        if 'comments' not in df.columns:
            self.logger.warning("'comments' column not found")
            return False
        
        self.logger.info("DataFrame validation passed")
        return True
    
    def get_sample(self, df: pd.DataFrame, n: int = 5) -> pd.DataFrame:
        """Get sample records for display.
        
        Args:
            df: DataFrame to sample from
            n: Number of samples to return
        
        Returns:
            DataFrame with n sample records
        """
        self.logger.info(f"Getting {n} sample records")
        
        # Return min of n or total rows
        sample_size = min(n, len(df))
        sample = df.head(sample_size)
        
        self.logger.info(f"Returning {len(sample)} sample records")
        return sample

logger.info("ReviewDataLoader class defined successfully")

### Load Review Data

Load the review dataset and display sample records.

In [None]:
# Initialize data loader
data_loader = ReviewDataLoader()

# Load reviews from configured file
reviews_df = data_loader.load_reviews(config.data_file, config.max_reviews)

# Display basic statistics
print("\n" + "="*60)
print("Dataset Statistics")
print("="*60)
print(f"Total reviews loaded: {len(reviews_df)}")
print(f"Columns: {list(reviews_df.columns)}")
print(f"Memory usage: {reviews_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print("="*60 + "\n")

logger.info(f"Loaded {len(reviews_df)} reviews from {config.data_file}")

### Display Sample Reviews

Show a few sample reviews to understand the data structure.

In [None]:
# Get sample records
sample_df = data_loader.get_sample(reviews_df, n=5)

print("\nSample Reviews:")
print("="*60)
display(sample_df[['comments']].head())

logger.info("Sample reviews displayed")

### Extract Comments

Extract and filter comment text for sentiment analysis.

In [None]:
# Extract comments
comments = data_loader.extract_comments(reviews_df)

print("\n" + "="*60)
print("Comment Extraction Results")
print("="*60)
print(f"Total comments extracted: {len(comments)}")
print(f"Average comment length: {np.mean([len(c) for c in comments]):.1f} characters")
print(f"Shortest comment: {min([len(c) for c in comments])} characters")
print(f"Longest comment: {max([len(c) for c in comments])} characters")
print("="*60 + "\n")

# Display a few sample comments
print("Sample Comments:")
for i, comment in enumerate(comments[:3], 1):
    print(f"\n{i}. {comment[:200]}..." if len(comment) > 200 else f"\n{i}. {comment}")

logger.info(f"Extracted {len(comments)} comments for analysis")

---
## 4. Prompt Engineering <a id='prompt-engineering'></a>

Design and implement prompts for sentiment classification using OpenAI LLMs.

### PromptEngine Class

Create a class to manage prompt construction and response validation.

In [None]:
class PromptEngine:
    """Manages prompt construction for sentiment analysis.
    
    This class handles:
    - System message definition
    - Few-shot example management
    - Prompt construction
    - Response validation
    """
    
    def __init__(self, few_shot_examples: Optional[List[Dict]] = None):
        """Initialize the PromptEngine.
        
        Args:
            few_shot_examples: Optional list of example dictionaries with 'comment' and 'sentiment' keys
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # Use default examples if none provided
        if few_shot_examples is None:
            self.few_shot_examples = self._get_default_examples()
        else:
            self.few_shot_examples = few_shot_examples
        
        self.logger.info(f"PromptEngine initialized with {len(self.few_shot_examples)} few-shot examples")
    
    def _get_default_examples(self) -> List[Dict[str, Any]]:
        """Get default few-shot examples.
        
        Returns:
            List of example dictionaries
        """
        return [
            {
                "comment": "The apartment was amazing! Great location and very clean. The host was super responsive and helpful. Would definitely stay here again!",
                "sentiment": 1
            },
            {
                "comment": "The place was okay, nothing special. It served its purpose for a short stay.",
                "sentiment": 0
            },
            {
                "comment": "Terrible experience. The apartment was dirty and the host was unresponsive. The photos were misleading. Would not recommend.",
                "sentiment": -1
            }
        ]
    
    def get_system_message(self) -> str:
        """Get system message defining the task.
        
        Returns:
            System message string
        """
        return (
            "You are a sentiment analysis expert. Your task is to analyze Airbnb review comments "
            "and classify them into one of three sentiment categories:\n\n"
            "- Positive (1): The review expresses satisfaction, praise, or positive experiences\n"
            "- Neutral (0): The review is balanced, factual, or neither clearly positive nor negative\n"
            "- Negative (-1): The review expresses dissatisfaction, complaints, or negative experiences\n\n"
            "Respond with ONLY the sentiment label: 1, 0, or -1. Do not include any explanation or additional text."
        )
    
    def get_few_shot_examples(self) -> str:
        """Format few-shot examples for the prompt.
        
        Returns:
            Formatted few-shot examples string
        """
        examples_text = "Here are some examples:\n\n"
        
        for i, example in enumerate(self.few_shot_examples, 1):
            examples_text += f"Example {i}:\n"
            examples_text += f"Comment: {example['comment']}\n"
            examples_text += f"Sentiment: {example['sentiment']}\n\n"
        
        return examples_text
    
    def build_prompt(self, comment: str) -> List[Dict[str, str]]:
        """Construct complete prompt for a comment.
        
        Args:
            comment: Review comment to analyze
        
        Returns:
            List of message dictionaries for OpenAI API
        """
        messages = [
            {
                "role": "system",
                "content": self.get_system_message()
            },
            {
                "role": "user",
                "content": self.get_few_shot_examples() + f"Now analyze this comment:\n\nComment: {comment}\n\nSentiment:"
            }
        ]
        
        return messages
    
    def validate_response(self, response: str) -> bool:
        """Validate that response is a valid sentiment label.
        
        Args:
            response: Response string from LLM
        
        Returns:
            True if response is valid (-1, 0, or 1), False otherwise
        """
        # Strip whitespace and try to convert to int
        try:
            sentiment = int(response.strip())
            return sentiment in [-1, 0, 1]
        except (ValueError, AttributeError):
            return False
    
    def parse_sentiment(self, response: str) -> int:
        """Parse sentiment from response string.
        
        Args:
            response: Response string from LLM
        
        Returns:
            Sentiment value (-1, 0, or 1)
        
        Raises:
            ValueError: If response cannot be parsed as valid sentiment
        """
        try:
            sentiment = int(response.strip())
            if sentiment not in [-1, 0, 1]:
                raise ValueError(f"Invalid sentiment value: {sentiment}")
            return sentiment
        except (ValueError, AttributeError) as e:
            self.logger.warning(f"Failed to parse sentiment from response: {response}")
            raise ValueError(f"Invalid sentiment response: {response}") from e

logger.info("PromptEngine class defined successfully")

### Initialize Prompt Engine

Create a prompt engine instance with default few-shot examples.

In [None]:
# Initialize prompt engine
prompt_engine = PromptEngine()

logger.info("Prompt engine initialized")

### Display Prompt Structure

Show the system message and few-shot examples.

In [None]:
print("\n" + "="*60)
print("Prompt Structure")
print("="*60)

print("\nSystem Message:")
print("-" * 60)
print(prompt_engine.get_system_message())

print("\n" + "="*60)
print("Few-Shot Examples:")
print("="*60)
print(prompt_engine.get_few_shot_examples())

logger.info("Prompt structure displayed")

### Test Prompt Construction

Build a sample prompt to verify the structure.

In [None]:
# Test with a sample comment
test_comment = "The location was perfect and the apartment was clean. However, the wifi was slow."

# Build prompt
test_messages = prompt_engine.build_prompt(test_comment)

print("\n" + "="*60)
print("Sample Prompt Construction")
print("="*60)
print(f"\nTest Comment: {test_comment}")
print("\nConstructed Messages:")
for i, msg in enumerate(test_messages, 1):
    print(f"\nMessage {i} ({msg['role']}):")
    print("-" * 60)
    print(msg['content'][:300] + "..." if len(msg['content']) > 300 else msg['content'])

logger.info("Prompt construction test completed")

---
## 5. Sentiment Prediction <a id='sentiment-prediction'></a>

Implement OpenAI API integration for sentiment prediction.

### OpenAIClient Class

Create a wrapper class for OpenAI API with error handling and response parsing.

In [None]:
class OpenAIClient:
    """Wrapper for OpenAI API with error handling and rate limiting.
    
    This class handles:
    - API authentication and initialization
    - Single and batch sentiment predictions
    - Response parsing and validation
    - Error handling with retries
    """
    
    def __init__(self, api_key: str, model: str, config: Config):
        """Initialize OpenAI client.
        
        Args:
            api_key: OpenAI API key
            model: Model name (e.g., 'gpt-3.5-turbo')
            config: Configuration object
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.config = config
        
        self.logger.info(f"OpenAIClient initialized with model: {model}")
    
    def predict_sentiment(self, comment: str, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """Predict sentiment for a single comment.
        
        Args:
            comment: Review comment to analyze
            messages: Formatted messages for API
        
        Returns:
            Dictionary with sentiment, tokens, and metadata
        """
        retry_count = 0
        last_error = None
        
        while retry_count <= self.config.max_retries:
            try:
                # Make API call
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=messages,
                    temperature=self.config.temperature,
                    max_tokens=self.config.max_tokens
                )
                
                # Parse response
                sentiment, input_tokens, output_tokens = self._parse_response(response)
                
                return {
                    "comment": comment,
                    "sentiment": sentiment,
                    "input_tokens": input_tokens,
                    "output_tokens": output_tokens,
                    "error": None
                }
                
            except RateLimitError as e:
                last_error = e
                retry_count += 1
                if retry_count <= self.config.max_retries:
                    delay = self._calculate_backoff(retry_count)
                    self.logger.warning(f"Rate limit hit. Retry {retry_count}/{self.config.max_retries} after {delay}s")
                    time.sleep(delay)
                else:
                    self.logger.error(f"Max retries exceeded for rate limit: {e}")
                    
            except (APIError, APIConnectionError, Timeout) as e:
                last_error = e
                retry_count += 1
                if retry_count <= self.config.max_retries:
                    delay = self._calculate_backoff(retry_count)
                    self.logger.warning(f"API error. Retry {retry_count}/{self.config.max_retries} after {delay}s: {e}")
                    time.sleep(delay)
                else:
                    self.logger.error(f"Max retries exceeded for API error: {e}")
                    
            except Exception as e:
                self.logger.error(f"Unexpected error: {e}")
                last_error = e
                break
        
        # Return error result if all retries failed
        return {
            "comment": comment,
            "sentiment": 0,  # Default to neutral
            "input_tokens": 0,
            "output_tokens": 0,
            "error": str(last_error)
        }
    
    def _parse_response(self, response: Any) -> Tuple[int, int, int]:
        """Extract sentiment and token counts from API response.
        
        Args:
            response: OpenAI API response object
        
        Returns:
            Tuple of (sentiment, input_tokens, output_tokens)
        
        Raises:
            ValueError: If response cannot be parsed
        """
        try:
            # Extract response text
            response_text = response.choices[0].message.content.strip()
            
            # Parse sentiment
            sentiment = int(response_text)
            
            # Validate sentiment
            if sentiment not in [-1, 0, 1]:
                self.logger.warning(f"Invalid sentiment value: {sentiment}, defaulting to 0")
                sentiment = 0
            
            # Extract token counts
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
            
            return sentiment, input_tokens, output_tokens
            
        except (ValueError, AttributeError, IndexError) as e:
            self.logger.warning(f"Failed to parse response, defaulting to neutral: {e}")
            # Default to neutral sentiment if parsing fails
            return 0, 0, 0
    
    def _calculate_backoff(self, retry_count: int) -> float:
        """Calculate exponential backoff delay.
        
        Args:
            retry_count: Current retry attempt number
        
        Returns:
            Delay in seconds
        """
        base_delay = self.config.rate_limit_delay
        delay = base_delay * (self.config.backoff_factor ** (retry_count - 1))
        return delay

logger.info("OpenAIClient class defined successfully")

### Initialize OpenAI Client

Create an OpenAI client instance with the configured API key and model.

In [None]:
# Initialize OpenAI client
openai_client = OpenAIClient(
    api_key=config.api_key,
    model=config.model_name,
    config=config
)

logger.info("OpenAI client initialized")

### Test Single Prediction

Test the sentiment prediction with a sample comment.

In [None]:
# Test with a sample comment
test_comment = "The apartment was clean and the host was friendly. Great experience overall!"

# Build prompt
test_messages = prompt_engine.build_prompt(test_comment)

# Predict sentiment
print("\n" + "="*60)
print("Testing Single Prediction")
print("="*60)
print(f"\nComment: {test_comment}")
print("\nMaking API call...")

result = openai_client.predict_sentiment(test_comment, test_messages)

print("\nResult:")
print(f"  Sentiment: {result['sentiment']}")
print(f"  Input Tokens: {result['input_tokens']}")
print(f"  Output Tokens: {result['output_tokens']}")
print(f"  Error: {result['error']}")
print("="*60 + "\n")

logger.info("Single prediction test completed")

### Batch Prediction Function

Implement batch processing with progress tracking and error resilience.

In [None]:
def batch_predict(comments: List[str], client: OpenAIClient, prompt_engine: PromptEngine, 
                  rate_limit_delay: float = 0.5) -> List[Dict[str, Any]]:
    """Predict sentiment for multiple comments with progress tracking.
    
    Args:
        comments: List of review comments
        client: OpenAI client instance
        prompt_engine: Prompt engine instance
        rate_limit_delay: Delay between requests in seconds
    
    Returns:
        List of prediction result dictionaries
    """
    results = []
    total = len(comments)
    
    logger.info(f"Starting batch prediction for {total} comments")
    print(f"\nProcessing {total} comments...")
    
    start_time = time.time()
    
    for i, comment in enumerate(comments, 1):
        try:
            # Build prompt
            messages = prompt_engine.build_prompt(comment)
            
            # Predict sentiment
            result = client.predict_sentiment(comment, messages)
            results.append(result)
            
            # Progress indicator
            if i % 10 == 0 or i == total:
                elapsed = time.time() - start_time
                rate = i / elapsed if elapsed > 0 else 0
                remaining = (total - i) / rate if rate > 0 else 0
                print(f"Progress: {i}/{total} ({i/total*100:.1f}%) - "
                      f"Rate: {rate:.1f} req/s - "
                      f"ETA: {remaining:.0f}s")
            
            # Rate limiting
            if i < total:  # Don't delay after last request
                time.sleep(rate_limit_delay)
                
        except Exception as e:
            logger.error(f"Error processing comment {i}: {e}")
            # Add error result and continue
            results.append({
                "comment": comment,
                "sentiment": 0,
                "input_tokens": 0,
                "output_tokens": 0,
                "error": str(e)
            })
    
    elapsed = time.time() - start_time
    logger.info(f"Batch prediction completed in {elapsed:.1f}s")
    print(f"\nCompleted in {elapsed:.1f}s")
    
    return results

logger.info("Batch prediction function defined")

### Run Batch Prediction

Process all comments and collect results.

In [None]:
# Run batch prediction
print("\n" + "="*60)
print("Batch Sentiment Prediction")
print("="*60)

prediction_results = batch_predict(
    comments=comments,
    client=openai_client,
    prompt_engine=prompt_engine,
    rate_limit_delay=config.rate_limit_delay
)

print("="*60 + "\n")

logger.info(f"Collected {len(prediction_results)} prediction results")

### Convert Results to DataFrame

Structure the prediction results for analysis.

In [None]:
# Convert results to DataFrame
results_df = pd.DataFrame(prediction_results)

# Display summary
print("\n" + "="*60)
print("Prediction Results Summary")
print("="*60)
print(f"Total predictions: {len(results_df)}")
print(f"Successful predictions: {results_df['error'].isna().sum()}")
print(f"Failed predictions: {results_df['error'].notna().sum()}")
print("="*60 + "\n")

# Display first few results
print("Sample Results:")
display(results_df[['comment', 'sentiment', 'input_tokens', 'output_tokens']].head(10))

logger.info("Results converted to DataFrame")

---
## 7. Cost Tracking <a id='cost-tracking'></a>

Track token usage and estimate API costs.

### CostTracker Class

Create a class to track token usage and calculate costs.

In [None]:
class CostTracker:
    """Tracks token usage and estimates API costs.
    
    This class handles:
    - Recording token usage per request
    - Aggregating total tokens
    - Calculating costs based on model pricing
    - Displaying usage summaries
    """
    
    def __init__(self, model_name: str):
        """Initialize cost tracker with model-specific pricing.
        
        Args:
            model_name: Name of the OpenAI model
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.model_name = model_name
        
        # Pricing per 1K tokens (as of 2024)
        self.pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},
            "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
            "gpt-3.5-turbo-16k": {"input": 0.003, "output": 0.004}
        }
        
        # Get pricing for model (default to gpt-3.5-turbo if not found)
        self.model_pricing = self.pricing.get(model_name, self.pricing["gpt-3.5-turbo"])
        
        # Initialize counters
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.request_count = 0
        
        self.logger.info(f"CostTracker initialized for model: {model_name}")
    
    def add_request(self, input_tokens: int, output_tokens: int) -> None:
        """Record token usage for a request.
        
        Args:
            input_tokens: Number of input tokens
            output_tokens: Number of output tokens
        """
        self.total_input_tokens += input_tokens
        self.total_output_tokens += output_tokens
        self.request_count += 1
    
    def add_results(self, results: List[Dict[str, Any]]) -> None:
        """Add token usage from a list of results.
        
        Args:
            results: List of prediction result dictionaries
        """
        for result in results:
            if result.get('error') is None:
                self.add_request(
                    result.get('input_tokens', 0),
                    result.get('output_tokens', 0)
                )
    
    def get_total_tokens(self) -> Dict[str, int]:
        """Get total input and output tokens.
        
        Returns:
            Dictionary with input, output, and total tokens
        """
        return {
            "input_tokens": self.total_input_tokens,
            "output_tokens": self.total_output_tokens,
            "total_tokens": self.total_input_tokens + self.total_output_tokens
        }
    
    def estimate_cost(self) -> float:
        """Calculate estimated cost in USD.
        
        Returns:
            Estimated cost in dollars
        """
        input_cost = (self.total_input_tokens / 1000) * self.model_pricing["input"]
        output_cost = (self.total_output_tokens / 1000) * self.model_pricing["output"]
        return input_cost + output_cost
    
    def display_summary(self) -> None:
        """Display usage and cost summary."""
        tokens = self.get_total_tokens()
        cost = self.estimate_cost()
        
        print("\n" + "="*60)
        print("Cost and Usage Summary")
        print("="*60)
        
        print(f"\nModel: {self.model_name}")
        print(f"Total Requests: {self.request_count}")
        
        print("\nToken Usage:")
        print(f"  Input Tokens: {tokens['input_tokens']:,}")
        print(f"  Output Tokens: {tokens['output_tokens']:,}")
        print(f"  Total Tokens: {tokens['total_tokens']:,}")
        
        print("\nPricing (per 1K tokens):")
        print(f"  Input: ${self.model_pricing['input']:.4f}")
        print(f"  Output: ${self.model_pricing['output']:.4f}")
        
        print("\nEstimated Cost:")
        print(f"  Input Cost: ${(tokens['input_tokens'] / 1000) * self.model_pricing['input']:.4f}")
        print(f"  Output Cost: ${(tokens['output_tokens'] / 1000) * self.model_pricing['output']:.4f}")
        print(f"  Total Cost: ${cost:.4f}")
        
        if self.request_count > 0:
            avg_cost = cost / self.request_count
            print(f"\nAverage Cost per Request: ${avg_cost:.6f}")
        
        print("="*60 + "\n")

logger.info("CostTracker class defined successfully")

### Track Costs for Predictions

Calculate and display the cost of the batch prediction.

In [None]:
# Initialize cost tracker
cost_tracker = CostTracker(config.model_name)

# Add results to tracker
cost_tracker.add_results(prediction_results)

# Display summary
cost_tracker.display_summary()

logger.info("Cost tracking completed")

---
## 6. Results Analysis <a id='results-analysis'></a>

Analyze and visualize sentiment prediction results.

### ResultsAnalyzer Class

Create a class to analyze and visualize prediction results.

In [None]:
class ResultsAnalyzer:
    """Analyzes and visualizes sentiment prediction results.
    
    This class handles:
    - Computing sentiment distribution
    - Calculating accuracy metrics
    - Creating visualizations
    - Displaying sample predictions
    """
    
    def __init__(self):
        """Initialize the ResultsAnalyzer."""
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info("ResultsAnalyzer initialized")
    
    def compute_distribution(self, results_df: pd.DataFrame) -> Dict[int, int]:
        """Compute sentiment distribution.
        
        Args:
            results_df: DataFrame with prediction results
        
        Returns:
            Dictionary mapping sentiment to count
        """
        self.logger.info("Computing sentiment distribution")
        
        distribution = results_df['sentiment'].value_counts().to_dict()
        
        # Ensure all sentiments are present
        for sentiment in [-1, 0, 1]:
            if sentiment not in distribution:
                distribution[sentiment] = 0
        
        self.logger.info(f"Distribution: {distribution}")
        return distribution
    
    def calculate_metrics(self, results_df: pd.DataFrame, 
                         ground_truth: Optional[pd.Series] = None) -> Dict[str, float]:
        """Calculate accuracy metrics if ground truth available.
        
        Args:
            results_df: DataFrame with prediction results
            ground_truth: Optional series with true sentiment labels
        
        Returns:
            Dictionary with accuracy metrics
        """
        self.logger.info("Calculating metrics")
        
        metrics = {}
        
        if ground_truth is not None:
            from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
            
            predictions = results_df['sentiment']
            
            metrics['accuracy'] = accuracy_score(ground_truth, predictions)
            metrics['precision'] = precision_score(ground_truth, predictions, average='weighted', zero_division=0)
            metrics['recall'] = recall_score(ground_truth, predictions, average='weighted', zero_division=0)
            metrics['f1'] = f1_score(ground_truth, predictions, average='weighted', zero_division=0)
            
            self.logger.info(f"Metrics: {metrics}")
        else:
            self.logger.info("No ground truth provided, skipping metrics calculation")
        
        return metrics
    
    def plot_distribution(self, distribution: Dict[int, int]) -> None:
        """Create bar chart of sentiment distribution.
        
        Args:
            distribution: Dictionary mapping sentiment to count
        """
        self.logger.info("Creating distribution plot")
        
        # Prepare data
        sentiments = [-1, 0, 1]
        labels = ['Negative', 'Neutral', 'Positive']
        counts = [distribution.get(s, 0) for s in sentiments]
        colors = ['#ef5350', '#ffa726', '#66bb6a']
        
        # Create figure
        fig, ax = plt.subplots(figsize=(10, 6))
        
        # Create bars
        bars = ax.bar(labels, counts, color=colors, alpha=0.8, edgecolor='black')
        
        # Add value labels on bars
        for bar, count in zip(bars, counts):
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{count}\n({count/sum(counts)*100:.1f}%)',
                   ha='center', va='bottom', fontsize=12, fontweight='bold')
        
        # Styling
        ax.set_xlabel('Sentiment', fontsize=12, fontweight='bold')
        ax.set_ylabel('Count', fontsize=12, fontweight='bold')
        ax.set_title('Sentiment Distribution', fontsize=14, fontweight='bold', pad=20)
        ax.grid(axis='y', alpha=0.3, linestyle='--')
        
        plt.tight_layout()
        plt.show()
        
        self.logger.info("Distribution plot created")
    
    def display_samples(self, results_df: pd.DataFrame, n: int = 10) -> None:
        """Display sample predictions.
        
        Args:
            results_df: DataFrame with prediction results
            n: Number of samples to display
        """
        self.logger.info(f"Displaying {n} sample predictions")
        
        print("\n" + "="*60)
        print("Sample Predictions")
        print("="*60)
        
        sentiment_map = {-1: 'Negative', 0: 'Neutral', 1: 'Positive'}
        
        for i, row in results_df.head(n).iterrows():
            sentiment_label = sentiment_map.get(row['sentiment'], 'Unknown')
            comment = row['comment'][:150] + "..." if len(row['comment']) > 150 else row['comment']
            
            print(f"\n{i+1}. Sentiment: {sentiment_label} ({row['sentiment']})")
            print(f"   Comment: {comment}")
            if row.get('error'):
                print(f"   Error: {row['error']}")
        
        print("\n" + "="*60 + "\n")
        
        self.logger.info("Sample predictions displayed")
    
    def export_results(self, results_df: pd.DataFrame, output_path: str) -> None:
        """Export results to CSV.
        
        Args:
            results_df: DataFrame with prediction results
            output_path: Path to save CSV file
        """
        self.logger.info(f"Exporting results to: {output_path}")
        
        # Create output directory if it doesn't exist
        output_dir = os.path.dirname(output_path)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir)
            self.logger.info(f"Created output directory: {output_dir}")
        
        # Export to CSV
        results_df.to_csv(output_path, index=False)
        
        self.logger.info(f"Results exported successfully to {output_path}")
        print(f"\nResults saved to: {output_path}")

logger.info("ResultsAnalyzer class defined successfully")

### Analyze Results

Compute sentiment distribution and display statistics.

In [None]:
# Initialize analyzer
analyzer = ResultsAnalyzer()

# Compute distribution
distribution = analyzer.compute_distribution(results_df)

# Display distribution
print("\n" + "="*60)
print("Sentiment Distribution")
print("="*60)
total = sum(distribution.values())
print(f"\nNegative (-1): {distribution[-1]} ({distribution[-1]/total*100:.1f}%)")
print(f"Neutral (0):   {distribution[0]} ({distribution[0]/total*100:.1f}%)")
print(f"Positive (1):  {distribution[1]} ({distribution[1]/total*100:.1f}%)")
print(f"\nTotal: {total}")
print("="*60 + "\n")

logger.info("Distribution analysis completed")

### Visualize Distribution

Create a bar chart showing sentiment distribution.

In [None]:
# Plot distribution
analyzer.plot_distribution(distribution)

logger.info("Distribution visualization created")

### Display Sample Predictions

Show sample predictions with original comments.

In [None]:
# Display samples
analyzer.display_samples(results_df, n=10)

logger.info("Sample predictions displayed")

---
## 8. Export Results <a id='export-results'></a>

Save prediction results to CSV for downstream analysis.

### Generate Output Filename

Create a descriptive filename with timestamp.

In [None]:
# Generate output filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dataset_name = os.path.basename(config.data_file).replace('.csv', '')
output_filename = f"{dataset_name}_sentiment_predictions_{timestamp}.csv"
output_path = os.path.join(config.output_dir, output_filename)

print(f"\nOutput file: {output_path}")

logger.info(f"Generated output filename: {output_filename}")

### Export Results to CSV

Save the prediction results with all metadata.

In [None]:
# Export results
analyzer.export_results(results_df, output_path)

# Display file info
if os.path.exists(output_path):
    file_size = os.path.getsize(output_path) / 1024  # KB
    print(f"File size: {file_size:.2f} KB")
    print(f"Rows exported: {len(results_df)}")
    print(f"Columns: {list(results_df.columns)}")

logger.info("Results exported successfully")

### Verify Export (Optional)

Load the exported file to verify data integrity.

In [None]:
# Load exported file to verify
if os.path.exists(output_path):
    loaded_df = pd.read_csv(output_path)
    
    print("\n" + "="*60)
    print("Export Verification")
    print("="*60)
    print(f"Original rows: {len(results_df)}")
    print(f"Loaded rows: {len(loaded_df)}")
    print(f"Match: {len(results_df) == len(loaded_df)}")
    print("="*60 + "\n")
    
    # Display first few rows
    print("First few rows of exported file:")
    display(loaded_df.head())
    
    logger.info("Export verification completed")
else:
    logger.warning(f"Export file not found: {output_path}")

---
## 9. Summary <a id='summary'></a>

Summary of the sentiment analysis workflow and results.

### Dataset Selection Helper

List available datasets for easy selection.

In [None]:
def list_available_datasets(directory: str = "reviews") -> List[str]:
    """List available CSV files in the reviews directory.
    
    Args:
        directory: Directory to search for CSV files
    
    Returns:
        List of CSV filenames
    """
    if not os.path.exists(directory):
        logger.warning(f"Directory not found: {directory}")
        return []
    
    csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]
    csv_files.sort()
    
    return csv_files

# List available datasets
available_datasets = list_available_datasets()

print("\n" + "="*60)
print("Available Datasets")
print("="*60)
for i, dataset in enumerate(available_datasets, 1):
    print(f"{i}. {dataset}")
print("="*60 + "\n")

logger.info(f"Found {len(available_datasets)} available datasets")

### Final Summary

Display key findings and recommendations.

In [None]:
print("\n" + "="*60)
print("SENTIMENT ANALYSIS SUMMARY")
print("="*60)

print("\nðŸ“Š Dataset Information:")
print(f"  File: {config.data_file}")
print(f"  Total Reviews: {len(reviews_df)}")
print(f"  Comments Analyzed: {len(comments)}")

print("\nðŸ¤– Model Configuration:")
print(f"  Model: {config.model_name}")
print(f"  Temperature: {config.temperature}")
print(f"  Max Tokens: {config.max_tokens}")

print("\nðŸ“ˆ Sentiment Distribution:")
total = sum(distribution.values())
print(f"  Positive: {distribution[1]} ({distribution[1]/total*100:.1f}%)")
print(f"  Neutral: {distribution[0]} ({distribution[0]/total*100:.1f}%)")
print(f"  Negative: {distribution[-1]} ({distribution[-1]/total*100:.1f}%)")

print("\nðŸ’° Cost Summary:")
tokens = cost_tracker.get_total_tokens()
cost = cost_tracker.estimate_cost()
print(f"  Total Tokens: {tokens['total_tokens']:,}")
print(f"  Estimated Cost: ${cost:.4f}")
print(f"  Cost per Review: ${cost/len(comments):.6f}")

print("\nðŸ’¾ Output:")
print(f"  Results saved to: {output_path}")

print("\n" + "="*60)
print("âœ… Analysis Complete!")
print("="*60 + "\n")

logger.info("Sentiment analysis workflow completed successfully")

### Recommendations and Next Steps

**Cost Optimization:**
- Use `gpt-3.5-turbo` for large-scale analysis (10-20x cheaper than GPT-4)
- Sample large datasets instead of processing all reviews
- Adjust `rate_limit_delay` based on your API tier

**Accuracy Improvement:**
- Fine-tune few-shot examples for your specific domain
- Experiment with different temperature settings
- Consider using GPT-4 for higher accuracy (at higher cost)

**Production Deployment:**
- Implement caching for duplicate comments
- Add database integration for persistent storage
- Set up monitoring and alerting for API errors
- Implement batch processing with queues for large datasets

**Further Analysis:**
- Perform aspect-based sentiment analysis (location, cleanliness, host)
- Compare sentiment across different cities
- Analyze temporal trends in sentiment
- Extract key phrases from positive and negative reviews

**Model Comparison:**
- Compare results from different OpenAI models
- Benchmark against other sentiment analysis services (IBM Watson, AWS Comprehend)
- Evaluate cost vs. accuracy tradeoffs

### References and Resources

**OpenAI Documentation:**
- API Reference: https://platform.openai.com/docs/api-reference
- Pricing: https://openai.com/pricing
- Best Practices: https://platform.openai.com/docs/guides/production-best-practices

**Prompt Engineering:**
- Prompt Engineering Guide: https://www.promptingguide.ai/
- Few-Shot Learning: https://platform.openai.com/docs/guides/prompt-engineering

**Dataset:**
- Inside Airbnb: http://insideairbnb.com/get-the-data.html

**Related Libraries:**
- OpenAI Python SDK: https://github.com/openai/openai-python
- Pandas: https://pandas.pydata.org/docs/
- Matplotlib: https://matplotlib.org/stable/contents.html