In [1]:
import json
import asyncio
import queue
import threading
from collections import defaultdict, deque
from typing import Dict, Any, Iterator, Generator, List
import time
import logging
from dataclasses import dataclass
import psycopg2
from psycopg2 import sql
import pika
from pika.exceptions import AMQPConnectionError
import orjson
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [2]:
@dataclass
class ProcessingConfig:
    batch_size: int = 100
    max_buffer_size: int = 1000
    flush_interval: float = 5.0  # seconds



In [3]:
class JSONEncoder(json.JSONEncoder):
    """Custom JSON encoder that handles datetime objects"""
    
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

In [4]:
class MemoryEfficientJSONParser:
    """Generator-based JSON parser for streaming data"""
    
    @staticmethod
    def parse_json_stream(stream: Iterator[bytes]) -> Generator[Dict[str, Any], None, None]:
        """
        Parse JSON data from a stream using generators to avoid loading everything into memory.
        """
        buffer = b""
        for chunk in stream:
            if isinstance(chunk, str):
                chunk = chunk.encode('utf-8')
            
            buffer += chunk
            
            # Try to parse complete JSON objects from buffer
            while buffer:
                try:
                    # Simple approach: look for complete JSON objects separated by newlines
                    decoded_buffer = buffer.decode('utf-8', errors='ignore')
                    
                    # Split by newlines and try to parse each line
                    lines = decoded_buffer.split('\n')
                    processed_lines = 0
                    
                    for i, line in enumerate(lines):
                        line = line.strip()
                        if not line:
                            processed_lines += 1
                            continue
                            
                        try:
                            obj = orjson.loads(line.encode())
                            yield obj
                            processed_lines += 1
                        except orjson.JSONDecodeError:
                            # This line might be incomplete, stop processing
                            break
                    
                    # Update buffer with unprocessed lines
                    if processed_lines == len(lines):
                        buffer = b""
                    else:
                        buffer = '\n'.join(lines[processed_lines:]).encode()
                        
                except Exception as e:
                    logger.warning(f"JSON parsing error: {e}")
                    buffer = b""
                    break
    
    @staticmethod
    def parse_single_json(data: bytes) -> Dict[str, Any]:
        """Parse single JSON object with error handling"""
        try:
            return orjson.loads(data)
        except orjson.JSONDecodeError as e:
            logger.error(f"Failed to parse JSON: {e}")
            raise

In [5]:
class DataTransformer:
    """Transforms and aggregates data using iterator patterns"""
    
    def __init__(self, aggregation_window: int = 60):
        self.aggregation_window = aggregation_window
        self.aggregation_buffer = defaultdict(lambda: defaultdict(int))
        self.last_flush_time = time.time()
    
    def transform_data(self, data_stream: Generator[Dict[str, Any], None, None]) -> Generator[Dict[str, Any], None, None]:
        """
        Transform and aggregate data using generator pattern.
        Processes one record at a time to maintain constant memory.
        """
        for record in data_stream:
            try:
                # Simple transformation example
                transformed = self._transform_single_record(record)
                
                # Aggregate data
                self._aggregate_record(transformed)
                
                # Yield individual transformed record
                yield transformed
                
                # Check if we should flush aggregates
                if self._should_flush_aggregates():
                    yield from self._flush_aggregates()
                    
            except Exception as e:
                logger.error(f"Error transforming record: {e}")
                continue
    
    def _transform_single_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Transform a single record - ensure all values are JSON serializable"""
        transformed = record.copy()
        
        # Convert timestamp to ISO string if present
        if 'timestamp' in transformed:
            try:
                # Handle both numeric timestamps and ISO format strings
                ts = transformed['timestamp']
                if isinstance(ts, (int, float)):
                    # Convert numeric timestamp to ISO string
                    transformed['processed_at'] = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
                elif isinstance(ts, str):
                    # Assume it's already in ISO format or can be parsed
                    transformed['processed_at'] = ts
                else:
                    # Fallback to current time
                    transformed['processed_at'] = datetime.now(timezone.utc).isoformat()
            except (ValueError, TypeError, OSError) as e:
                logger.warning(f"Could not parse timestamp {transformed['timestamp']}: {e}")
                transformed['processed_at'] = datetime.now(timezone.utc).isoformat()
        else:
            # Add current timestamp if not present
            transformed['processed_at'] = datetime.now(timezone.utc).isoformat()
        
        # Add processing metadata with string timestamps
        transformed['_processed_ts'] = datetime.now(timezone.utc).isoformat()
        transformed['_transform_version'] = '1.0'
        
        # Normalize value if present
        if 'value' in transformed:
            try:
                transformed['normalized_value'] = float(transformed['value']) / 100.0
            except (ValueError, TypeError):
                transformed['normalized_value'] = 0.0
        
        # Ensure all values are JSON serializable
        return self._ensure_json_serializable(transformed)
    
    def _ensure_json_serializable(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Ensure all values in the dictionary are JSON serializable"""
        serializable_data = {}
        for key, value in data.items():
            if isinstance(value, (str, int, float, bool, type(None))):
                serializable_data[key] = value
            elif isinstance(value, (list, tuple)):
                serializable_data[key] = [self._ensure_json_serializable_item(item) for item in value]
            elif isinstance(value, dict):
                serializable_data[key] = self._ensure_json_serializable(value)
            else:
                # Convert to string representation
                serializable_data[key] = str(value)
        return serializable_data
    
    def _ensure_json_serializable_item(self, item: Any) -> Any:
        """Ensure a single item is JSON serializable"""
        if isinstance(item, (str, int, float, bool, type(None))):
            return item
        elif isinstance(item, (list, tuple)):
            return [self._ensure_json_serializable_item(i) for i in item]
        elif isinstance(item, dict):
            return self._ensure_json_serializable(item)
        else:
            return str(item)
    
    def _aggregate_record(self, record: Dict[str, Any]):
        """Aggregate record data in memory-efficient way"""
        current_time = time.time()
        window_key = int(current_time // self.aggregation_window)
        
        # Example aggregation by type
        record_type = record.get('type', 'unknown')
        self.aggregation_buffer[window_key][record_type] += 1
    
    def _should_flush_aggregates(self) -> bool:
        """Check if aggregates should be flushed based on time"""
        return time.time() - self.last_flush_time >= self.aggregation_window
    
    def _flush_aggregates(self) -> Generator[Dict[str, Any], None, None]:
        """Flush aggregated data and yield aggregation records"""
        current_time = time.time()
        current_window = int(current_time // self.aggregation_window)
        
        # Remove old windows and yield aggregates
        windows_to_remove = []
        for window_key, aggregates in self.aggregation_buffer.items():
            if window_key < current_window - 1:  # Keep current and previous window
                yield {
                    'type': 'aggregation',
                    'window_start': window_key * self.aggregation_window,
                    'window_end': (window_key + 1) * self.aggregation_window,
                    'aggregates': dict(aggregates),
                    'processed_at': datetime.fromtimestamp(current_time, tz=timezone.utc).isoformat(),
                    'record_count': sum(aggregates.values()),
                    'window_start_iso': datetime.fromtimestamp(
                        window_key * self.aggregation_window, tz=timezone.utc
                    ).isoformat(),
                    'window_end_iso': datetime.fromtimestamp(
                        (window_key + 1) * self.aggregation_window, tz=timezone.utc
                    ).isoformat()
                }
                windows_to_remove.append(window_key)
        
        # Clean up flushed windows
        for window_key in windows_to_remove:
            del self.aggregation_buffer[window_key]
        
        self.last_flush_time = current_time

In [6]:
class DatabaseManager:
    """Manages database operations with connection pooling and proper type handling"""
    
    def __init__(self, db_config: Dict[str, Any]):
        self.db_config = db_config
        self.connection_pool = []
        self.max_pool_size = 5
    
    def get_connection(self):
        """Get a database connection from pool or create new one"""
        try:
            if self.connection_pool:
                return self.connection_pool.pop()
            else:
                return psycopg2.connect(**self.db_config)
        except Exception as e:
            logger.error(f"Database connection error: {e}")
            raise
    
    def return_connection(self, conn):
        """Return connection to pool"""
        if len(self.connection_pool) < self.max_pool_size and not conn.closed:
            self.connection_pool.append(conn)
        else:
            try:
                conn.close()
            except:
                pass
    
    def initialize_schema(self):
        """Initialize database schema with proper types"""
        conn = self.get_connection()
        try:
            with conn.cursor() as cursor:
                # Use TIMESTAMP WITH TIME ZONE for proper time handling
                cursor.execute("""
                    CREATE TABLE IF NOT EXISTS processed_data (
                        id SERIAL PRIMARY KEY,
                        data JSONB,
                        processed_at TIMESTAMP WITH TIME ZONE,
                        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
                    )
                """)
                cursor.execute("""
                    CREATE TABLE IF NOT EXISTS data_aggregations (
                        id SERIAL PRIMARY KEY,
                        window_start TIMESTAMP WITH TIME ZONE,
                        window_end TIMESTAMP WITH TIME ZONE,
                        aggregates JSONB,
                        record_count INTEGER,
                        created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
                    )
                """)
                conn.commit()
                logger.info("Database schema initialized with proper types")
        except Exception as e:
            logger.error(f"Schema initialization error: {e}")
            conn.rollback()
        finally:
            self.return_connection(conn)

In [7]:
class MessageQueueManager:
    """Manages message queue operations"""
    
    def __init__(self, mq_config: Dict[str, Any]):
        self.mq_config = mq_config
        self._connection = None
        self._channel = None
    
    def ensure_connection(self):
        """Ensure message queue connection is established"""
        if self._connection is None or self._connection.is_closed:
            try:
                self._connection = pika.BlockingConnection(
                    pika.ConnectionParameters(**self.mq_config)
                )
                self._channel = self._connection.channel()
                self._channel.queue_declare(queue='processed_data', durable=True)
                logger.info("Message queue connection established")
            except AMQPConnectionError as e:
                logger.error(f"Message queue connection failed: {e}")
                raise
    
    def publish_message(self, message: Dict[str, Any]):
        """Publish message to queue"""
        self.ensure_connection()
        try:
            # Use custom JSON encoder for serialization
            message_body = json.dumps(message, cls=JSONEncoder, ensure_ascii=False)
            self._channel.basic_publish(
                exchange='',
                routing_key='processed_data',
                body=message_body.encode('utf-8'),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # make message persistent
                    content_type='application/json',
                )
            )
        except Exception as e:
            logger.error(f"Message publishing failed: {e}")
            # Reset connection for next attempt
            self._connection = None
            raise

In [8]:
class OutputManager:
    """Manages output to database and message queue with proper type handling"""
    
    def __init__(self, db_config: Dict[str, Any], mq_config: Dict[str, Any]):
        self.db_manager = DatabaseManager(db_config)
        self.mq_manager = MessageQueueManager(mq_config)
        self.db_batch = []
        self.mq_batch = []
        self.json_encoder = JSONEncoder()
    
    def initialize(self):
        """Initialize output systems"""
        self.db_manager.initialize_schema()
        self.mq_manager.ensure_connection()
    
    def add_to_batch(self, record: Dict[str, Any], batch_type: str):
        """Add record to appropriate batch"""
        if batch_type == 'db':
            self.db_batch.append(record)
        else:
            self.mq_batch.append(record)
    
    def flush_batch(self, batch_type: str, force: bool = False):
        """Flush batch to destination"""
        if batch_type == 'db' and (force or len(self.db_batch) >= 100):
            self._flush_to_db()
        elif batch_type == 'mq' and (force or len(self.mq_batch) >= 50):
            self._flush_to_mq()
    
    def _convert_to_db_types(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Convert record fields to proper database types"""
        processed_record = record.copy()
        
        # Convert processed_at to datetime if it's a string
        if 'processed_at' in processed_record:
            processed_at = processed_record['processed_at']
            if isinstance(processed_at, str):
                try:
                    # Parse ISO format string to datetime
                    if processed_at.endswith('Z'):
                        processed_at = processed_at.replace('Z', '+00:00')
                    processed_record['processed_at'] = datetime.fromisoformat(processed_at)
                except (ValueError, TypeError) as e:
                    logger.warning(f"Could not parse processed_at {processed_at}: {e}")
                    # Fallback to current time
                    processed_record['processed_at'] = datetime.now(timezone.utc)
            elif isinstance(processed_at, (int, float)):
                # Convert numeric timestamp to datetime
                processed_record['processed_at'] = datetime.fromtimestamp(processed_at, tz=timezone.utc)
        
        return processed_record
    
    def _serialize_for_db(self, record: Dict[str, Any]) -> str:
        """Serialize record for database JSONB storage"""
        try:
            # Use orjson for efficient serialization if available, fallback to json with custom encoder
            return orjson.dumps(record).decode('utf-8')
        except:
            return json.dumps(record, cls=JSONEncoder, ensure_ascii=False)
    
    def _flush_to_db(self):
        """Flush batch to database with proper type conversion"""
        if not self.db_batch:
            return
            
        conn = self.db_manager.get_connection()
        try:
            with conn.cursor() as cursor:
                for record in self.db_batch:
                    processed_record = self._convert_to_db_types(record)
                    serialized_data = self._serialize_for_db(record)
                    
                    if processed_record.get('type') == 'aggregation':
                        # Handle aggregation records
                        cursor.execute("""
                            INSERT INTO data_aggregations 
                            (window_start, window_end, aggregates, record_count)
                            VALUES (%s, %s, %s, %s)
                        """, (
                            datetime.fromtimestamp(processed_record['window_start'], tz=timezone.utc),
                            datetime.fromtimestamp(processed_record['window_end'], tz=timezone.utc),
                            serialized_data,  # Store the entire record as JSON
                            processed_record['record_count']
                        ))
                    else:
                        # Handle regular records
                        cursor.execute(
                            "INSERT INTO processed_data (data, processed_at) VALUES (%s, %s)",
                            (serialized_data, processed_record.get('processed_at'))
                        )
                conn.commit()
                logger.info(f"Flushed {len(self.db_batch)} records to database")
                self.db_batch.clear()
        except Exception as e:
            logger.error(f"Error flushing to database: {e}")
            conn.rollback()
        finally:
            self.db_manager.return_connection(conn)
    
    def _flush_to_mq(self):
        """Flush batch to message queue"""
        if not self.mq_batch:
            return
            
        success_count = 0
        for record in self.mq_batch:
            try:
                self.mq_manager.publish_message(record)
                success_count += 1
            except Exception as e:
                logger.error(f"Failed to publish message: {e}")
                continue
        
        if success_count > 0:
            logger.info(f"Flushed {success_count} records to message queue")
            # Only remove successfully published messages
            self.mq_batch = self.mq_batch[success_count:]

In [9]:
class DataProcessingPipeline:
    """Main pipeline that orchestrates the entire process"""
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.parser = MemoryEfficientJSONParser()
        self.transformer = DataTransformer()
        self.output_manager = OutputManager(
            db_config={
                'host': 'localhost',
                'database': 'dataprocessing',
                'user': 'user',
                'password': 'pass',
                'port': 5272
            },
            mq_config={
                'host': 'localhost',
                'port': 5672
            }
        )
        self.input_queue = queue.Queue(maxsize=config.max_buffer_size)
        self.running = False
        self.processing_thread = None
        self.output_thread = None
    
    def start(self):
        """Start the pipeline"""
        self.running = True
        self.output_manager.initialize()
        
        # Start processing threads
        self.processing_thread = threading.Thread(target=self._process_data, daemon=True)
        self.output_thread = threading.Thread(target=self._handle_output, daemon=True)
        
        self.processing_thread.start()
        self.output_thread.start()
        
        logger.info("Data processing pipeline started")
    
    def stop(self):
        """Stop the pipeline gracefully"""
        self.running = False
        # Flush any remaining data
        self.output_manager.flush_batch('db', force=True)
        self.output_manager.flush_batch('mq', force=True)
        
        if self.processing_thread:
            self.processing_thread.join(timeout=5.0)
        if self.output_thread:
            self.output_thread.join(timeout=5.0)
        
        logger.info("Data processing pipeline stopped")
    
    def ingest_data(self, data: bytes):
        """Ingest data from webhook - non-blocking"""
        try:
            self.input_queue.put(data, block=False)
            return True
        except queue.Full:
            logger.warning("Input queue full, dropping data")
            return False
    
    def _process_data(self):
        """Process data from input queue using generators"""
        def input_generator():
            while self.running:
                try:
                    data = self.input_queue.get(timeout=1.0)
                    yield data
                    self.input_queue.task_done()
                except queue.Empty:
                    continue
        
        try:
            # Create processing pipeline using generators
            raw_data_stream = input_generator()
            parsed_stream = self.parser.parse_json_stream(raw_data_stream)
            transformed_stream = self.transformer.transform_data(parsed_stream)
            
            # Process transformed data
            for transformed_record in transformed_stream:
                # Route to appropriate output
                if transformed_record.get('type') == 'aggregation':
                    self.output_manager.add_to_batch(transformed_record, 'db')
                else:
                    self.output_manager.add_to_batch(transformed_record, 'mq')
                    self.output_manager.add_to_batch(transformed_record, 'db')
                    
        except Exception as e:
            logger.error(f"Processing thread error: {e}")
    
    def _handle_output(self):
        """Handle output batching and flushing"""
        last_flush = time.time()
        
        while self.running:
            current_time = time.time()
            
            # Periodic flushing
            if current_time - last_flush >= self.config.flush_interval:
                self.output_manager.flush_batch('db')
                self.output_manager.flush_batch('mq')
                last_flush = current_time
            
            time.sleep(0.1)  # Small sleep to prevent busy waiting

In [10]:
# Synchronous webhook handler
class WebhookHandler:
    """Handles incoming webhook requests synchronously"""
    
    def __init__(self, pipeline: DataProcessingPipeline):
        self.pipeline = pipeline
    
    def handle_webhook(self, request_data: bytes) -> Dict[str, Any]:
        """Handle incoming webhook data synchronously"""
        success = self.pipeline.ingest_data(request_data)
        return {
            "status": "accepted" if success else "rejected",
            "message": "Data queued for processing" if success else "System busy, try again later",
            "timestamp": datetime.now(timezone.utc).isoformat()
        }

In [17]:
# Usage example
if __name__ == "__main__":
    config = ProcessingConfig(
        batch_size=100,
        max_buffer_size=1000,
        flush_interval=5.0
    )
    
    pipeline = DataProcessingPipeline(config)
    
    try:
        pipeline.start()
        
        # Simulate webhook data - SYNCHRONOUS calls
        webhook_handler = WebhookHandler(pipeline)
        
        # Simulate incoming data with various timestamp formats
        sample_data = [
            b'{"type": "event", "value": 100, "timestamp": 1234567890}',
            b'{"type": "event", "value": 200, "timestamp": "2023-10-05T12:00:00Z"}',
            b'{"type": "metric", "value": 50}',
            b'{"type": "error", "message": "test error", "timestamp": 1759619228}'
        ]
        
        for i, data in enumerate(sample_data):
            result = webhook_handler.handle_webhook(data)
            print(f"Data {i+1}: {result}")
            time.sleep(0.1)
        _time = 70
        # Keep running for a while to see aggregation
        print(f"Pipeline running for {_time} seconds to demonstrate aggregation...")
        time.sleep(_time)
        
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        pipeline.stop()

2025-10-06 12:16:22,487 - __main__ - INFO - Database schema initialized with proper types
2025-10-06 12:16:22,491 - pika.adapters.utils.connection_workflow - INFO - Pika version 1.3.2 connecting to ('127.0.0.1', 5672)
2025-10-06 12:16:22,492 - pika.adapters.utils.io_services_utils - INFO - Socket connected: <socket.socket fd=57, family=2, type=1, proto=6, laddr=('127.0.0.1', 37748), raddr=('127.0.0.1', 5672)>
2025-10-06 12:16:22,497 - pika.adapters.utils.connection_workflow - INFO - Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fbb6c49a660>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fbb6c49a660> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2025-10-06 12:16:22,511 - pika.adapters.utils.connection_workflow - INFO - AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapte

Data 1: {'status': 'accepted', 'message': 'Data queued for processing', 'timestamp': '2025-10-06T11:16:22.555315+00:00'}
Data 2: {'status': 'accepted', 'message': 'Data queued for processing', 'timestamp': '2025-10-06T11:16:22.658628+00:00'}
Data 3: {'status': 'accepted', 'message': 'Data queued for processing', 'timestamp': '2025-10-06T11:16:22.758973+00:00'}
Data 4: {'status': 'accepted', 'message': 'Data queued for processing', 'timestamp': '2025-10-06T11:16:22.859191+00:00'}
Pipeline running for 70 seconds to demonstrate aggregation...


2025-10-06 12:17:34,397 - __main__ - INFO - Flushed 4 records to database
2025-10-06 12:17:34,409 - __main__ - INFO - Flushed 4 records to message queue
2025-10-06 12:17:34,412 - __main__ - INFO - Data processing pipeline stopped
