In [1]:
import os
import json
import boto3
import logging
import psycopg2
from openai import OpenAI
from datetime import datetime
from dotenv import load_dotenv
from psycopg2.extras import Json
from typing import Dict, List, Optional

In [None]:
class DataProcess:
    """
    A class to manage support tickets with vector embeddings and image storage.

    This system handles:
    - Database operations with PostgreSQL and pgvector
    - S3 storage for ticket-related images
    - OpenAI embeddings for semantic search
    """

    def __init__(self, config_path: Optional[str] = None):
        """
        Initialize the DataProcess with configuration.

        Args:
            config_path: Optional path to a JSON configuration file
        """
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        )
        self.logger = logging.getLogger(__name__)

        # Load environment variables
        load_dotenv()

        # Load configuration
        self.config = self._load_config(config_path)

        # Initialize clients
        self.openai_client = self._init_openai_client()
        self.s3_client = self._init_s3_client()
        self.has_vector_support = False

        # Store tickets data
        self.support_tickets = []

    def _load_config(self, config_path: Optional[str]) -> Dict:
        """
        Load configuration from a file or use environment variables.

        Args:
            config_path: Path to a JSON configuration file

        Returns:
            Dict containing configuration settings
        """
        config = {
            "openai": {"api_key": os.getenv("OPENAI_API_KEY")},
            "database": {
                "host": "localhost",
                "port": "5435",
                "name": "postgres",
                "user": "postgres",
                "password": "postgres",
                "table_name": "vector_table",
            },
            "s3": {
                "bucket_name": "xyz-support-images",
                "access_key_id": "test",
                "secret_access_key": "test",
                "region": "us-east-1",
                "endpoint_url": "http://localhost:4566",
            },
            "data": {"tickets_file": "./data/vector_ticket_data.json"},
        }

        # Override with file config if provided
        if config_path:
            try:
                with open(config_path, "r") as f:
                    file_config = json.load(f)
                    # Deep merge the configurations
                    self._merge_configs(config, file_config)
            except Exception as e:
                self.logger.error(f"Error loading config file: {e}")

        return config

    def _merge_configs(self, base: Dict, override: Dict) -> None:
        """
        Recursively merge two configuration dictionaries.

        Args:
            base: Base configuration dict that will be updated
            override: Dict with values to override in base
        """
        for key, value in override.items():
            if isinstance(value, dict) and key in base and isinstance(base[key], dict):
                self._merge_configs(base[key], value)
            else:
                base[key] = value

    def _init_openai_client(self) -> OpenAI:
        """
        Initialize the OpenAI client.

        Returns:
            OpenAI client instance
        """
        api_key = self.config["openai"]["api_key"]
        if not api_key:
            self.logger.warning("OpenAI API key not found. Embeddings will not work.")

        return OpenAI(api_key=api_key)

    def _init_s3_client(self) -> boto3.client:
        """
        Initialize the S3 client.

        Returns:
            boto3 S3 client
        """
        return boto3.client(
            "s3",
            aws_access_key_id=self.config["s3"]["access_key_id"],
            aws_secret_access_key=self.config["s3"]["secret_access_key"],
            region_name=self.config["s3"]["region"],
            endpoint_url=self.config["s3"]["endpoint_url"],
        )

    def _get_db_connection(self) -> psycopg2.extensions.connection:
        """
        Create a database connection.

        Returns:
            PostgreSQL connection object

        Raises:
            Exception: If connection fails
        """
        db_config = self.config["database"]
        return psycopg2.connect(
            host=db_config["host"],
            port=db_config["port"],
            dbname=db_config["name"],
            user=db_config["user"],
            password=db_config["password"],
        )

    def load_tickets(self, file_path: Optional[str] = None) -> None:
        """
        Load support tickets data from a JSON file.

        Args:
            file_path: Path to JSON file containing ticket data
        """
        path = file_path or self.config["data"]["tickets_file"]
        try:
            with open(path, "r") as f:
                self.support_tickets = json.load(f)
            self.logger.info(f"Loaded {len(self.support_tickets)} tickets from {path}")
        except Exception as e:
            self.logger.error(f"Error loading tickets from {path}: {e}")
            self.support_tickets = []

    def setup_database(self) -> bool:
        """
        Set up the database tables and extensions.

        Returns:
            bool: True if pgvector support is available, False otherwise
        """
        conn = None
        try:
            conn = self._get_db_connection()
            cur = conn.cursor()

            # Try creating the vector extension
            try:
                cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
                conn.commit()
                self.logger.info("pgvector extension enabled successfully")

                # Create table with vector support
                self._create_table_with_vector(cur, conn)
                self.has_vector_support = True
                return True

            except Exception as e:
                conn.rollback()
                self.logger.warning(f"Could not create pgvector extension: {e}")
                self.logger.info("Creating table without vector column")

                # Create table without vector support as fallback
                self._create_table_without_vector(cur, conn)
                self.has_vector_support = False
                return False

        except Exception as e:
            self.logger.error(f"Database setup error: {e}")
            if conn:
                conn.rollback()
            return False
        finally:
            if conn:
                conn.close()

    def _create_table_with_vector(self, cursor, connection) -> None:
        """
        Create database table with vector column.

        Args:
            cursor: Database cursor
            connection: Database connection
        """
        table_name = self.config["database"]["table_name"]
        cursor.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                id SERIAL PRIMARY KEY,
                ticket_id VARCHAR(20) UNIQUE NOT NULL,
                subject TEXT NOT NULL,
                description TEXT NOT NULL,
                customer JSONB NOT NULL,
                metadata JSONB NOT NULL,
                resolution JSONB,
                embedding vector(1536),
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            );
            """
        )
        connection.commit()
        self.logger.info("Table with vector column created successfully")

    def _create_table_without_vector(self, cursor, connection) -> None:
        """
        Create database table without vector column.

        Args:
            cursor: Database cursor
            connection: Database connection
        """
        table_name = self.config["database"]["table_name"]
        cursor.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                id SERIAL PRIMARY KEY,
                ticket_id VARCHAR(20) UNIQUE NOT NULL,
                subject TEXT NOT NULL,
                description TEXT NOT NULL,
                customer JSONB NOT NULL,
                metadata JSONB NOT NULL,
                resolution JSONB,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            );
            """
        )
        connection.commit()
        self.logger.info("Table without vector column created successfully")

    def setup_s3_bucket(self) -> None:
        """Create the S3 bucket if it doesn't exist."""
        bucket_name = self.config["s3"]["bucket_name"]
        try:
            # Check if bucket exists
            self.s3_client.head_bucket(Bucket=bucket_name)
            self.logger.info(f"Bucket {bucket_name} already exists")
        except Exception:
            # Create the bucket
            try:
                self.s3_client.create_bucket(Bucket=bucket_name)
                self.logger.info(f"Created S3 bucket: {bucket_name}")
            except Exception as e:
                self.logger.error(f"Error creating S3 bucket: {e}")

    def upload_images(self) -> None:
        """Upload images from the data folder to S3."""
        self.setup_s3_bucket()

        # Gather all image data from tickets
        image_data = []
        for ticket in self.support_tickets:
            if "metadata" in ticket and "images" in ticket["metadata"]:
                ticket_id = ticket["ticket_id"]
                for image in ticket["metadata"]["images"]:
                    image_data.append(
                        {
                            "s3_key": image["s3_key"],
                            "description": image["description"],
                            "ticket_id": ticket_id,
                        }
                    )

        # Upload each image
        for img_info in image_data:
            self._upload_single_image(img_info)

    def _upload_single_image(self, img_info: Dict) -> None:
        """
        Upload a single image to S3.

        Args:
            img_info: Dict containing image information
        """
        bucket_name = self.config["s3"]["bucket_name"]
        try:
            # Construct the local file path
            local_file_path = os.path.join("data", img_info["s3_key"])

            # Check if the file exists
            if not os.path.exists(local_file_path):
                self.logger.warning(f"Local image {local_file_path} not found")
                return

            # Read the file content
            with open(local_file_path, "rb") as file:
                file_content = file.read()

            # Upload to S3
            self.s3_client.put_object(
                Bucket=bucket_name,
                Key=img_info["s3_key"],
                Body=file_content,
                ContentType="image/png",
            )
            self.logger.info(
                f"Uploaded image to s3://{bucket_name}/{img_info['s3_key']}"
            )
        except Exception as e:
            self.logger.error(f"Error uploading image {img_info['s3_key']}: {e}")

    def generate_embedding(self, text: str) -> Optional[List[float]]:
        """
        Generate embedding vector using OpenAI's embedding model.

        Args:
            text: Text to generate embedding for

        Returns:
            List of embedding values or None if generation fails
        """
        try:
            response = self.openai_client.embeddings.create(
                model="text-embedding-ada-002", input=text
            )
            embedding = response.data[0].embedding
            return embedding
        except Exception as e:
            self.logger.error(f"Error generating embedding: {e}")
            return None

    def verify_s3_images(self, ticket: Dict) -> Dict:
        """
        Verify that images exist in S3 bucket and add presigned URLs.

        Args:
            ticket: Ticket data dictionary

        Returns:
            Updated ticket dictionary with image verification info
        """
        bucket_name = self.config["s3"]["bucket_name"]

        if "metadata" in ticket and "images" in ticket["metadata"]:
            for image in ticket["metadata"]["images"]:
                s3_key = image["s3_key"]
                try:
                    # Check if the image exists in S3
                    self.s3_client.head_object(Bucket=bucket_name, Key=s3_key)

                    # Add a presigned URL
                    presigned_url = self.s3_client.generate_presigned_url(
                        "get_object", Params={"Bucket": bucket_name, "Key": s3_key}
                    )
                    image["presigned_url"] = presigned_url
                    image["exists"] = True

                except Exception as e:
                    self.logger.warning(f"Image {s3_key} not found in S3: {e}")
                    image["exists"] = False

        return ticket

    def insert_ticket(self, ticket: Dict) -> bool:
        """
        Insert a support ticket with its embedding into the database.

        Args:
            ticket: Ticket data dictionary

        Returns:
            bool: True if insertion succeeded, False otherwise
        """
        conn = None
        try:
            # Verify S3 images
            ticket = self.verify_s3_images(ticket)

            # Connect to PostgreSQL
            conn = self._get_db_connection()
            cur = conn.cursor()

            # Generate embedding if vector support is available
            embedding = None
            if self.has_vector_support:
                # Generate text for embedding
                embedding_text = f"{ticket['subject']} {ticket['description']}"
                if "resolution" in ticket and "solution" in ticket["resolution"]:
                    embedding_text += f" {ticket['resolution']['solution']}"

                # Generate embedding
                embedding = self.generate_embedding(embedding_text)

            table_name = self.config["database"]["table_name"]

            if self.has_vector_support and embedding:
                self._insert_with_vector(cur, table_name, ticket, embedding)
            else:
                self._insert_without_vector(cur, table_name, ticket)

            # Commit the transaction
            conn.commit()
            self.logger.info(f"Ticket {ticket['ticket_id']} inserted successfully")
            return True

        except Exception as e:
            self.logger.error(f"Error inserting ticket {ticket['ticket_id']}: {e}")
            if conn:
                conn.rollback()
            return False
        finally:
            if conn:
                conn.close()

    def _insert_with_vector(
        self, cursor, table_name: str, ticket: Dict, embedding: List[float]
    ) -> None:
        """
        Insert ticket with vector embedding.

        Args:
            cursor: Database cursor
            table_name: Name of the database table
            ticket: Ticket data dictionary
            embedding: List of embedding values
        """
        cursor.execute(
            f"""
            INSERT INTO {table_name} 
            (ticket_id, subject, description, customer, metadata, resolution, embedding, created_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s::vector, %s)
            ON CONFLICT (ticket_id) 
            DO UPDATE SET
                subject = EXCLUDED.subject,
                description = EXCLUDED.description,
                customer = EXCLUDED.customer,
                metadata = EXCLUDED.metadata,
                resolution = EXCLUDED.resolution,
                embedding = EXCLUDED.embedding,
                created_at = EXCLUDED.created_at
            """,
            (
                ticket["ticket_id"],
                ticket["subject"],
                ticket["description"],
                Json(ticket["customer"]),
                Json(ticket["metadata"]),
                Json(ticket["resolution"]) if "resolution" in ticket else None,
                str(embedding),  # Convert list to string for casting
                datetime.now(),
            ),
        )

    def _insert_without_vector(self, cursor, table_name: str, ticket: Dict) -> None:
        """
        Insert ticket without vector embedding.

        Args:
            cursor: Database cursor
            table_name: Name of the database table
            ticket: Ticket data dictionary
        """
        cursor.execute(
            f"""
            INSERT INTO {table_name} 
            (ticket_id, subject, description, customer, metadata, resolution, created_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (ticket_id) 
            DO UPDATE SET
                subject = EXCLUDED.subject,
                description = EXCLUDED.description,
                customer = EXCLUDED.customer,
                metadata = EXCLUDED.metadata,
                resolution = EXCLUDED.resolution,
                created_at = EXCLUDED.created_at
            """,
            (
                ticket["ticket_id"],
                ticket["subject"],
                ticket["description"],
                Json(ticket["customer"]),
                Json(ticket["metadata"]),
                Json(ticket["resolution"]) if "resolution" in ticket else None,
                datetime.now(),
            ),
        )

    def test_connections(self) -> None:
        """Test database and S3 connections and log the results."""
        self.logger.info("Testing Database Connection")
        try:
            conn = self._get_db_connection()
            cur = conn.cursor()
            cur.execute("SELECT version();")
            version = cur.fetchone()
            self.logger.info(f"Connected to PostgreSQL: {version[0]}")

            # Check pgvector extension
            cur.execute(
                "SELECT name, installed_version FROM pg_available_extensions WHERE name = 'vector';"
            )
            ext = cur.fetchone()
            if ext:
                self.logger.info(f"pgvector is available: {ext}")

                # Check if installed
                cur.execute(
                    "SELECT extname, extversion FROM pg_extension WHERE extname = 'vector';"
                )
                installed = cur.fetchone()
                if installed:
                    self.logger.info(f"pgvector is installed: {installed}")
                else:
                    self.logger.info("pgvector is available but not installed yet")
            else:
                self.logger.info("pgvector extension is not available")

            conn.close()
        except Exception as e:
            self.logger.error(f"PostgreSQL connection error: {e}")

        self.logger.info("Testing S3 Connection")
        try:
            response = self.s3_client.list_buckets()
            self.logger.info(
                f"Connected to S3, found {len(response['Buckets'])} buckets"
            )
            for bucket in response["Buckets"]:
                self.logger.info(f" - {bucket['Name']}")
        except Exception as e:
            self.logger.error(f"S3 connection error: {e}")

    def process_all_tickets(self) -> None:
        """Process all loaded support tickets."""
        success_count = 0
        fail_count = 0

        for ticket in self.support_tickets:
            if self.insert_ticket(ticket):
                success_count += 1
            else:
                fail_count += 1
                self.logger.error(f"Failed to process ticket {ticket['ticket_id']}")

        self.logger.info(
            f"Processed {success_count} tickets successfully, {fail_count} failed"
        )

    def run(self, tickets_file: Optional[str] = None) -> None:
        """
        Run the full process of setting up and processing tickets.

        Args:
            tickets_file: Optional path to a JSON file with ticket data
        """
        self.test_connections()
        self.has_vector_support = self.setup_database()
        self.load_tickets(tickets_file)
        self.upload_images()
        self.process_all_tickets()

In [3]:
system = DataProcess()
system.run()

2025-06-10 15:38:29,412 - __main__ - INFO - Testing Database Connection
2025-06-10 15:38:29,434 - __main__ - INFO - Connected to PostgreSQL: PostgreSQL 17.4 (Debian 17.4-1.pgdg120+2) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
2025-06-10 15:38:29,444 - __main__ - INFO - pgvector is available: ('vector', None)
2025-06-10 15:38:29,445 - __main__ - INFO - pgvector is available but not installed yet
2025-06-10 15:38:29,445 - __main__ - INFO - Testing S3 Connection
2025-06-10 15:38:29,461 - __main__ - INFO - Connected to S3, found 1 buckets
2025-06-10 15:38:29,461 - __main__ - INFO -  - xyz-support-images
2025-06-10 15:38:29,497 - __main__ - INFO - pgvector extension enabled successfully
2025-06-10 15:38:29,503 - __main__ - INFO - Table with vector column created successfully
2025-06-10 15:38:29,504 - __main__ - INFO - Loaded 2 tickets from ./data/vector_ticket_data.json
2025-06-10 15:38:29,516 - __main__ - INFO - Bucket xyz-support-images already exists
