# Market Data Service

**Real-time WebSocket tick data collection and processing system for algorithmic trading**

---

## Table of Contents

1. [Overview & Architecture](#section-1)
2. [Environment Setup](#section-2)
3. [Data Models with Pydantic](#section-3)
4. [Single WebSocket Connection](#section-4)
5. [Threaded WebSocket Connection](#section-5)
6. [JSONL Storage Implementation](#section-6)
7. [Three-Connection Manager](#section-7)
8. [Subscription Management](#section-8)
9. [Status Monitoring & Endpoints](#section-9)
10. [Complete Service Integration](#section-10)
11. [Real-World Testing](#section-11)
12. [Usage Examples & Utilities](#section-12)

---
<a id="section-1"></a>
## Section 1: Overview & Architecture

### Problem Statement

As a trader/algorithmic trading system, we need real-time market data converted into usable format and stored in local files for later consumption.

### Success Criteria

- ✅ Receive real-time tick data from broker WebSocket
- ✅ Store data reliably in JSONL files
- ✅ Manage instrument subscription/unsubscription across 3 WebSocket connections
- ✅ Handle network disconnections gracefully with auto-reconnection
- ✅ Process 1000 ticks per second without data loss

### Architecture Overview

```
┌─────────────────────────────────────────────────────────────────┐
│                    Market Data Service                          │
│                  (FastAPI Main Thread)                          │
└────────────┬────────────────────────────────────┬───────────────┘
             │                                    │
    ┌────────▼─────────┐              ┌─────────▼──────────┐
    │  Connection 1    │              │   Connection 2     │
    │   (Primary)      │              │   (Secondary)      │
    │   Thread 1       │              │   Thread 2         │
    │                  │              │                    │
    │  MODE: QUOTE     │              │  MODE: FULL        │
    │  Max: 3000 inst  │              │  Max: 3000 inst    │
    │  Trading Universe│              │  On-demand Full    │
    └────────┬─────────┘              └─────────┬──────────┘
             │                                   │
             │        ┌─────────────────┐        │
             │        │  Connection 3   │        │
             └────────►   (Tertiary)    ◄────────┘
                      │   Thread 3      │
                      │                 │
                      │  MODE: Failover │
                      │  Activated when │
                      │  1 or 2 fails   │
                      └────────┬────────┘
                               │
                      ┌────────▼─────────┐
                      │  JSONL Storage   │
                      │  market_data_    │
                      │  YYYY-MM-DD.jsonl│
                      └──────────────────┘
```

### Key Concepts

**WebSocket Modes:**
- **LTP (Last Traded Price):** Minimal data - only last price
- **QUOTE:** OHLC + Volume + Buy/Sell quantities
- **FULL:** Complete market depth + All quote data

**Connection Strategy:**
- **Connection 1:** Primary connection for all trading universe stocks in QUOTE mode
- **Connection 2:** Secondary connection activated for FULL mode data when needed
- **Connection 3:** Redundancy connection for failover scenarios
- Each connection supports max 3000 instruments
- Single API key can have up to 3 WebSocket connections

**Performance Target:** Process 1000 ticks/second across all connections without data loss

---
<a id="section-2"></a>
## Section 2: Environment Setup

In [9]:
# Cell 1: Imports and Configuration

import logging
import os
import json
import time
import threading
from datetime import datetime, date
from typing import Dict, List, Optional, Any, Set
from pathlib import Path
from collections import defaultdict

# KiteConnect imports
from kiteconnect import KiteTicker

# Data handling
import pandas as pd

# Type validation
from pydantic import BaseModel, Field, validator

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("✓ All imports successful")

✓ All imports successful


In [10]:
# Cell 2: Credentials and Constants

# Load credentials from environment variables
API_KEY = os.getenv("KITE_API_KEY")
ACCESS_TOKEN = os.getenv("KITE_ACCESS_TOKEN")

if not API_KEY or not ACCESS_TOKEN:
    logger.warning(
        "Missing credentials. Set KITE_API_KEY and KITE_ACCESS_TOKEN "
        "environment variables before running WebSocket connections"
    )
else:
    logger.info("✓ Credentials loaded from environment")

# Configuration constants
MAX_INSTRUMENTS_PER_CONNECTION = 3000
MAX_WEBSOCKET_CONNECTIONS = 3

# Storage configuration
DATA_DIR = Path("./market_data")
DATA_DIR.mkdir(exist_ok=True)

# WebSocket modes (from KiteTicker)
MODE_LTP = "ltp"
MODE_QUOTE = "quote"
MODE_FULL = "full"

logger.info(f"✓ Configuration loaded")
logger.info(f"✓ Data directory: {DATA_DIR}")
logger.info(f"✓ Max instruments per connection: {MAX_INSTRUMENTS_PER_CONNECTION}")

2025-10-01 11:15:31,383 - __main__ - INFO - ✓ Configuration loaded
2025-10-01 11:15:31,384 - __main__ - INFO - ✓ Data directory: market_data
2025-10-01 11:15:31,387 - __main__ - INFO - ✓ Max instruments per connection: 3000


In [11]:
# Cell 3: Sample Instrument Tokens

# Sample instrument tokens for testing (from Zerodha examples)
SAMPLE_TOKENS = [
    408065, 738561, 341249, 1270529, 779521, 492033, 1510401, 1346049, 3050241,
    579329, 2863105, 261889, 81153, 119553, 5582849, 4774913, 6054401, 175361,
    4268801, 579329, 2953217, 408065, 969473, 1850625, 3465729, 1152769,
    4701441, 4752385, 356865, 424961, 4598529, 140033, 197633, 2585345, 1041153,
    3876097, 878593, 4843777, 857857, 225537, 177665, 2800641, 900609, 303617,
    70401, 418049, 2911489, 40193, 2815745, 884737, 519937, 232961, 2170625,
    345089, 54273, 108033, 558337, 738561, 633601, 415745, 134657, 1207553,
    4464129, 2905857, 2977281, 3834113, 6401, 895745, 3001089, 348929, 3924993,
    758529, 5215745, 758529, 1723649, 2939649, 112129, 951809, 2513665, 108033,
    806401, 3329, 3848705, 486657, 470529, 2714625, 3677697, 738561, 3431425,
    975873, 952577, 3721473
]

# Trading universe - all instruments for Connection 1 (QUOTE mode)
TRADING_UNIVERSE = SAMPLE_TOKENS[:20]  # Use first 20 for demo

# Full mode instruments for Connection 2
FULL_MODE_INSTRUMENTS = SAMPLE_TOKENS[20:25]  # Use next 5 for demo

logger.info(f"✓ Trading universe size: {len(TRADING_UNIVERSE)}")
logger.info(f"✓ Full mode instruments: {len(FULL_MODE_INSTRUMENTS)}")
logger.info(f"✓ Total sample tokens available: {len(SAMPLE_TOKENS)}")

2025-10-01 11:15:58,860 - __main__ - INFO - ✓ Trading universe size: 20
2025-10-01 11:15:58,861 - __main__ - INFO - ✓ Full mode instruments: 5
2025-10-01 11:15:58,863 - __main__ - INFO - ✓ Total sample tokens available: 92


---
<a id="section-3"></a>
## Section 3: Data Models with Pydantic

We use Pydantic for type validation on all tick data, ensuring data quality and catching errors early.

In [13]:
# Cell 4: Tick Data Models

class OHLCData(BaseModel):
    """OHLC data structure"""
    open: float
    high: float
    low: float
    close: float

class TickData(BaseModel):
    """Raw tick data received from KiteTicker"""
    tradable: bool
    mode: str
    instrument_token: int
    last_price: float
    last_traded_quantity: Optional[int] = None
    average_traded_price: Optional[float] = None
    volume_traded: Optional[int] = None
    total_buy_quantity: Optional[int] = None
    total_sell_quantity: Optional[int] = None
    ohlc: OHLCData
    change: float
    
    class Config:
        # Allow extra fields for FULL mode data (depth, oi, etc.)
        extra = "allow"

class ProcessedTick(BaseModel):
    """Processed tick with timestamp for storage"""
    tradable: bool
    mode: str
    instrument_token: int
    last_price: float
    last_traded_quantity: Optional[int] = None
    average_traded_price: Optional[float] = None
    volume_traded: Optional[int] = None
    total_buy_quantity: Optional[int] = None
    total_sell_quantity: Optional[int] = None
    ohlc: Dict[str, float]
    change: float
    timestamp: str
    connection_id: int  # Which connection received this
    
    class Config:
        extra = "allow"

# Test the models
sample_tick = {
    "tradable": True,
    "mode": "quote",
    "instrument_token": 738561,
    "last_price": 2500.50,
    "last_traded_quantity": 10,
    "average_traded_price": 2498.75,
    "volume_traded": 1500000,
    "total_buy_quantity": 50000,
    "total_sell_quantity": 45000,
    "ohlc": {"open": 2495.0, "high": 2510.0, "low": 2490.0, "close": 2505.0},
    "change": 0.5
}

tick = TickData(**sample_tick)
logger.info(f"✓ Tick model validated: {tick.instrument_token}")

processed = ProcessedTick(
    **sample_tick,
    timestamp=datetime.now().isoformat(),
    connection_id=1
)
logger.info(f"✓ Processed tick model validated with timestamp")

2025-10-01 11:31:05,795 - __main__ - INFO - ✓ Tick model validated: 738561
2025-10-01 11:31:05,796 - __main__ - INFO - ✓ Processed tick model validated with timestamp


In [14]:
# Cell 5: Status and Health Models

class ConnectionStatus(BaseModel):
    """Status of a single WebSocket connection"""
    connection_id: int
    is_connected: bool
    subscribed_instruments: List[int]
    instrument_count: int
    mode: str
    tick_count: int = 0
    last_tick_time: Optional[str] = None
    data_ingress_start_time: Optional[str] = None
    uptime_seconds: float = 0.0

class InstrumentSubscription(BaseModel):
    """Subscription status of an instrument"""
    instrument_token: int
    connection_id: int
    mode: str
    subscribed_at: str
    tick_count: int = 0

class HealthResponse(BaseModel):
    """Health check response"""
    status: str  # "healthy", "degraded", "down"
    timestamp: str
    active_connections: int
    total_subscribed_instruments: int
    kite_connected: bool
    last_tick_time: Optional[str] = None
    uptime_seconds: float
    ticks_per_second: float = 0.0
    connections: List[ConnectionStatus]

# Test status models
conn_status = ConnectionStatus(
    connection_id=1,
    is_connected=True,
    subscribed_instruments=[738561, 408065],
    instrument_count=2,
    mode="quote",
    tick_count=1500,
    last_tick_time=datetime.now().isoformat(),
    uptime_seconds=3600.0
)

health = HealthResponse(
    status="healthy",
    timestamp=datetime.now().isoformat(),
    active_connections=1,
    total_subscribed_instruments=2,
    kite_connected=True,
    uptime_seconds=3600.0,
    connections=[conn_status]
)

logger.info(f"✓ Status models validated")
logger.info(f"  Health status: {health.status}")
logger.info(f"  Active connections: {health.active_connections}")

2025-10-01 11:31:12,647 - __main__ - INFO - ✓ Status models validated
2025-10-01 11:31:12,651 - __main__ - INFO -   Health status: healthy
2025-10-01 11:31:12,653 - __main__ - INFO -   Active connections: 1


---
<a id="section-4"></a>
## Section 4: Single WebSocket Connection (Learning)

Learn how KiteTicker works with a basic non-threaded connection.  
This section demonstrates the callback pattern and basic subscription management.

In [15]:
### Code Cell 6: Basic WebSocket Setup
# Callback functions
def on_ticks_demo(ws, ticks):
    """Callback to receive ticks"""
    if len(ticks) > 0:
        logger.info(f"Received {len(ticks)} ticks")
        # Show first tick
        logger.info(f"Sample tick: Token={ticks[0]['instrument_token']}, "
                   f"Price={ticks[0]['last_price']}, Mode={ticks[0]['mode']}")

def on_connect_demo(ws, response):
    """Callback on successful connection"""
    logger.info(f"Successfully connected. Response: {response}")
    
    # Subscribe to a few instruments
    tokens = TRADING_UNIVERSE[:5]  # First 5 instruments
    ws.subscribe(tokens)
    
    # Set mode to QUOTE
    ws.set_mode(ws.MODE_QUOTE, tokens)
    logger.info(f"Subscribed to {len(tokens)} instruments in QUOTE mode")

def on_close_demo(ws, code, reason):
    """Callback when connection is closed"""
    logger.info(f"Connection closed: {code} - {reason}")

def on_error_demo(ws, code, reason):
    """Callback when connection encounters error"""
    logger.error(f"Connection error: {code} - {reason}")

def on_reconnect_demo(ws, attempts_count):
    """Callback when reconnection is attempted"""
    logger.info(f"Reconnecting: Attempt #{attempts_count}")

def on_noreconnect_demo(ws):
    """Callback when reconnection fails"""
    logger.error("Reconnect failed - max attempts exceeded")

logger.info("✓ Callback functions defined")
logger.info("\nTo test basic connection (DON'T RUN - blocks main thread):")
logger.info("""  
kws = KiteTicker(API_KEY, ACCESS_TOKEN)
kws.on_ticks = on_ticks_demo
kws.on_connect = on_connect_demo
kws.on_close = on_close_demo
kws.on_error = on_error_demo
kws.on_reconnect = on_reconnect_demo
kws.on_noreconnect = on_noreconnect_demo

# This will block the main thread
# kws.connect()
""")

2025-10-01 11:32:15,113 - __main__ - INFO - ✓ Callback functions defined
2025-10-01 11:32:15,114 - __main__ - INFO - 
To test basic connection (DON'T RUN - blocks main thread):
2025-10-01 11:32:15,115 - __main__ - INFO -   
kws = KiteTicker(API_KEY, ACCESS_TOKEN)
kws.on_ticks = on_ticks_demo
kws.on_connect = on_connect_demo
kws.on_close = on_close_demo
kws.on_error = on_error_demo
kws.on_reconnect = on_reconnect_demo
kws.on_noreconnect = on_noreconnect_demo

# This will block the main thread
# kws.connect()



**Important Notes on Basic WebSocket:**
- `kws.connect()` is blocking - nothing after it will execute
- All interaction must happen through callbacks
- Auto-reconnection is enabled by default
- Use `kws.stop()` in `on_close` to prevent reconnection

---
<a id="section-5"></a>
## Section 5: Threaded WebSocket Connection

For real-world applications, we need threaded mode so the main thread  
can continue managing subscriptions and monitoring status.

In [16]:
### Code Cell 7: Threaded WebSocket Implementation

class ThreadedWebSocketConnection:
    """Manages a single threaded KiteTicker WebSocket connection"""
    
    def __init__(self, connection_id: int, api_key: str, access_token: str):
        self.connection_id = connection_id
        self.api_key = api_key
        self.access_token = access_token
        
        # Statistics
        self.tick_count = 0
        self.last_tick_time = None
        self.start_time = None
        self.subscribed_tokens: Set[int] = set()
        self.current_mode = MODE_QUOTE
        
        # Thread safety
        self.lock = threading.Lock()
        
        # KiteTicker instance
        self.kws = None
        
    def _on_ticks(self, ws, ticks):
        """Internal tick handler"""
        with self.lock:
            self.tick_count += len(ticks)
            self.last_tick_time = datetime.now().isoformat()
        
        logger.info(f"[Conn-{self.connection_id}] Received {len(ticks)} ticks")
        
    def _on_connect(self, ws, response):
        """Internal connect handler"""
        logger.info(f"[Conn-{self.connection_id}] Connected: {response}")
        self.start_time = datetime.now().isoformat()
        
    def _on_close(self, ws, code, reason):
        """Internal close handler"""
        logger.warning(f"[Conn-{self.connection_id}] Closed: {code} - {reason}")
        
    def _on_error(self, ws, code, reason):
        """Internal error handler"""
        logger.error(f"[Conn-{self.connection_id}] Error: {code} - {reason}")
        
    def _on_reconnect(self, ws, attempts_count):
        """Internal reconnect handler"""
        logger.info(f"[Conn-{self.connection_id}] Reconnecting: Attempt #{attempts_count}")
        
    def start(self):
        """Start the threaded WebSocket connection"""
        self.kws = KiteTicker(self.api_key, self.access_token)
        
        # Assign callbacks
        self.kws.on_ticks = self._on_ticks
        self.kws.on_connect = self._on_connect
        self.kws.on_close = self._on_close
        self.kws.on_error = self._on_error
        self.kws.on_reconnect = self._on_reconnect
        
        # Connect in threaded mode
        self.kws.connect(threaded=True)
        
        logger.info(f"[Conn-{self.connection_id}] Started in threaded mode")
        
    def subscribe(self, tokens: List[int], mode: str = MODE_QUOTE):
        """Subscribe to instruments"""
        if not self.kws or not self.is_connected():
            logger.error(f"[Conn-{self.connection_id}] Not connected")
            return False
            
        with self.lock:
            # Check capacity
            total_after = len(self.subscribed_tokens) + len(tokens)
            if total_after > MAX_INSTRUMENTS_PER_CONNECTION:
                logger.error(f"[Conn-{self.connection_id}] Would exceed max capacity")
                return False
            
            self.kws.subscribe(tokens)
            self.kws.set_mode(mode, tokens)
            self.subscribed_tokens.update(tokens)
            self.current_mode = mode
            
        logger.info(f"[Conn-{self.connection_id}] Subscribed {len(tokens)} instruments in {mode} mode")
        return True
        
    def unsubscribe(self, tokens: List[int]):
        """Unsubscribe from instruments"""
        if not self.kws or not self.is_connected():
            return False
            
        with self.lock:
            self.kws.unsubscribe(tokens)
            self.subscribed_tokens -= set(tokens)
            
        logger.info(f"[Conn-{self.connection_id}] Unsubscribed {len(tokens)} instruments")
        return True
        
    def is_connected(self) -> bool:
        """Check if WebSocket is connected"""
        if not self.kws:
            return False
        return self.kws.is_connected()
        
    def stop(self):
        """Stop the connection"""
        if self.kws:
            self.kws.close()
            logger.info(f"[Conn-{self.connection_id}] Stopped")
            
    def get_status(self) -> ConnectionStatus:
        """Get connection status"""
        with self.lock:
            uptime = 0.0
            if self.start_time:
                start_dt = datetime.fromisoformat(self.start_time)
                uptime = (datetime.now() - start_dt).total_seconds()
                
            return ConnectionStatus(
                connection_id=self.connection_id,
                is_connected=self.is_connected(),
                subscribed_instruments=list(self.subscribed_tokens),
                instrument_count=len(self.subscribed_tokens),
                mode=self.current_mode,
                tick_count=self.tick_count,
                last_tick_time=self.last_tick_time,
                data_ingress_start_time=self.start_time,
                uptime_seconds=uptime
            )

logger.info("✓ ThreadedWebSocketConnection class defined")

2025-10-01 11:33:09,782 - __main__ - INFO - ✓ ThreadedWebSocketConnection class defined


In [17]:
### Code Cell 8: Test Threaded Connection
# Test the threaded connection (commented out - uncomment to test)

if API_KEY and ACCESS_TOKEN:
    test_conn = ThreadedWebSocketConnection(1, API_KEY, ACCESS_TOKEN)
    test_conn.start()
    
    # Wait for connection
    time.sleep(2)
    
    # Subscribe to a few instruments
    test_conn.subscribe(TRADING_UNIVERSE[:5], MODE_QUOTE)
    
    # Let it run for 10 seconds
    time.sleep(10)
    
    # Get status
    status = test_conn.get_status()
    print(f"Connection status: {status.dict()}")
    
    # Stop
    test_conn.stop()

logger.info("✓ Threaded connection test code ready (commented out)")
logger.info("  Uncomment and run the cell above to test")

2025-10-01 11:33:32,823 - __main__ - INFO - ✓ Threaded connection test code ready (commented out)
2025-10-01 11:33:32,824 - __main__ - INFO -   Uncomment and run the cell above to test


## Section 6: JSONL Storage Implementation

### Storage Strategy
We'll implement JSONL (JSON Lines) storage for tick data:
- **Date-based files**: `market_data_YYYY-MM-DD.jsonl`
- **Timestamp addition**: Each tick gets a timestamp when stored
- **Atomic writes**: Thread-safe file operations
- **File rotation**: Automatic new file creation at midnight

### JSONL Format Benefits
- Line-delimited JSON (one JSON object per line)
- Easy to append without reading entire file
- Can process line-by-line for large files
- Human-readable and machine-parseable
- Compatible with pandas, data pipelines, etc.

### Sample JSONL Data
```json
{"tradable": true, "mode": "quote", "instrument_token": 408065, "last_price": 2500.50, "timestamp": "2025-10-01T09:15:23.123456"}
{"tradable": true, "mode": "quote", "instrument_token": 738561, "last_price": 1850.75, "timestamp": "2025-10-01T09:15:23.456789"}

In [None]:
import os
import json
from pathlib import Path
from datetime import datetime, date
from typing import List, Dict, Optional
import threading

class StorageManager:
    """Manages JSONL file storage for tick data"""
    
    def __init__(self, data_dir: str = './market_data'):
        """
        Initialize storage manager
        
        Args:
            data_dir: Directory for storing JSONL files
        """
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(parents=True, exist_ok=True)
        
        # Thread safety for file writes
        self.write_lock = threading.Lock()
        
        # Track current file
        self.current_date = None
        self.current_file_handle = None
        
        logging.info(f"StorageManager initialized. Data directory: {self.data_dir}")
    
    def get_filename_for_date(self, date_obj: date) -> str:
        """
        Get filename for a specific date
        
        Args:
            date_obj: Date object
        
        Returns:
            str: Filename in format market_data_YYYY-MM-DD.jsonl
        """
        return f"market_data_{date_obj.strftime('%Y-%m-%d')}.jsonl"
    
    def get_filepath_for_date(self, date_obj: date) -> Path:
        """
        Get full filepath for a specific date
        
        Args:
            date_obj: Date object
        
        Returns:
            Path: Full path to JSONL file
        """
        filename = self.get_filename_for_date(date_obj)
        return self.data_dir / filename
    
    def get_current_filename(self) -> str:
        """Get filename for current date"""
        return self.get_filename_for_date(date.today())
    
    def get_current_filepath(self) -> Path:
        """Get full filepath for current date"""
        return self.get_filepath_for_date(date.today())
    
    def _ensure_file_handle(self):
        """
        Ensure we have an open file handle for current date
        Handles automatic file rotation at midnight
        """
        today = date.today()
        
        # Check if date has changed (file rotation needed)
        if self.current_date != today:
            # Close old file if open
            if self.current_file_handle is not None:
                self.current_file_handle.close()
                logging.info(f"Closed file for {self.current_date}")
            
            # Open new file for today
            filepath = self.get_current_filepath()
            self.current_file_handle = open(filepath, 'a', buffering=1)  # Line buffering
            self.current_date = today
            
            logging.info(f"Opened new file: {filepath}")
    
    def store_ticks(self, ticks: List[Dict], connection_id: Optional[int] = None):
        """
        Store tick data to JSONL file
        
        Args:
            ticks: List of tick dictionaries
            connection_id: Optional connection ID for tracking
        """
        if not ticks:
            return
        
        timestamp = datetime.now().isoformat()
        
        with self.write_lock:
            try:
                # Ensure we have correct file handle
                self._ensure_file_handle()
                
                # Write each tick as a JSON line
                for tick in ticks:
                    # Create a copy to avoid modifying original
                    tick_copy = tick.copy()
                    
                    # Add metadata
                    tick_copy['timestamp'] = timestamp
                    if connection_id is not None:
                        tick_copy['connection_id'] = connection_id
                    
                    # Write as single line
                    json_line = json.dumps(tick_copy) + '\n'
                    self.current_file_handle.write(json_line)
                
                # File is line-buffered, so it will flush automatically
                # But we can force flush for critical operations
                # self.current_file_handle.flush()
                
            except Exception as e:
                logging.error(f"Error storing ticks: {e}")
    
    def store_single_tick(self, tick: Dict, connection_id: Optional[int] = None):
        """
        Store a single tick (convenience method)
        
        Args:
            tick: Tick dictionary
            connection_id: Optional connection ID
        """
        self.store_ticks([tick], connection_id)
    
    def count_ticks_today(self) -> int:
        """
        Count number of ticks in today's file
        
        Returns:
            int: Number of ticks (lines in file)
        """
        filepath = self.get_current_filepath()
        
        if not filepath.exists():
            return 0
        
        try:
            with open(filepath, 'r') as f:
                return sum(1 for _ in f)
        except Exception as e:
            logging.error(f"Error counting ticks: {e}")
            return 0
    
    def count_ticks_in_file(self, filepath: Path) -> int:
        """
        Count ticks in a specific file
        
        Args:
            filepath: Path to JSONL file
        
        Returns:
            int: Number of ticks
        """
        if not filepath.exists():
            return 0
        
        try:
            with open(filepath, 'r') as f:
                return sum(1 for _ in f)
        except Exception as e:
            logging.error(f"Error counting ticks in {filepath}: {e}")
            return 0
    
    def get_file_size(self, filepath: Path) -> int:
        """
        Get file size in bytes
        
        Args:
            filepath: Path to file
        
        Returns:
            int: Size in bytes
        """
        if not filepath.exists():
            return 0
        
        return filepath.stat().st_size
    
    def get_file_size_mb(self, filepath: Path) -> float:
        """
        Get file size in megabytes
        
        Args:
            filepath: Path to file
        
        Returns:
            float: Size in MB
        """
        return self.get_file_size(filepath) / (1024 * 1024)
    
    def list_all_files(self) -> List[Path]:
        """
        List all JSONL files in data directory
        
        Returns:
            List[Path]: List of file paths
        """
        return sorted(self.data_dir.glob('market_data_*.jsonl'))
    
    def count_files(self) -> int:
        """
        Count total number of JSONL files
        
        Returns:
            int: Number of files
        """
        return len(self.list_all_files())
    
    def get_file_info(self, filepath: Path) -> Dict:
        """
        Get detailed information about a file
        
        Args:
            filepath: Path to file
        
        Returns:
            dict: File information
        """
        if not filepath.exists():
            return {'exists': False}
        
        stat = filepath.stat()
        
        return {
            'exists': True,
            'filename': filepath.name,
            'path': str(filepath),
            'size_bytes': stat.st_size,
            'size_mb': stat.st_size / (1024 * 1024),
            'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
            'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
            'tick_count': self.count_ticks_in_file(filepath)
        }
    
    def get_storage_summary(self) -> Dict:
        """
        Get summary of all storage
        
        Returns:
            dict: Storage summary
        """
        files = self.list_all_files()
        
        total_size = sum(self.get_file_size(f) for f in files)
        total_ticks = sum(self.count_ticks_in_file(f) for f in files)
        
        return {
            'data_directory': str(self.data_dir),
            'total_files': len(files),
            'total_size_bytes': total_size,
            'total_size_mb': round(total_size / (1024 * 1024), 2),
            'total_ticks': total_ticks,
            'files': [f.name for f in files],
            'current_file': self.get_current_filename(),
            'current_file_ticks': self.count_ticks_today()
        }
    
    def read_ticks_from_file(self, filepath: Path, max_ticks: Optional[int] = None) -> List[Dict]:
        """
        Read ticks from a JSONL file
        
        Args:
            filepath: Path to JSONL file
            max_ticks: Maximum number of ticks to read (None for all)
        
        Returns:
            List[Dict]: List of tick dictionaries
        """
        ticks = []
        
        if not filepath.exists():
            logging.warning(f"File not found: {filepath}")
            return ticks
        
        try:
            with open(filepath, 'r') as f:
                for i, line in enumerate(f):
                    if max_ticks and i >= max_ticks:
                        break
                    
                    try:
                        tick = json.loads(line.strip())
                        ticks.append(tick)
                    except json.JSONDecodeError as e:
                        logging.error(f"Error parsing line {i+1}: {e}")
                        continue
        
        except Exception as e:
            logging.error(f"Error reading file {filepath}: {e}")
        
        return ticks
    
    def read_ticks_today(self, max_ticks: Optional[int] = None) -> List[Dict]:
        """
        Read ticks from today's file
        
        Args:
            max_ticks: Maximum number of ticks to read
        
        Returns:
            List[Dict]: List of tick dictionaries
        """
        filepath = self.get_current_filepath()
        return self.read_ticks_from_file(filepath, max_ticks)
    
    def cleanup_old_files(self, days_to_keep: int = 30):
        """
        Delete files older than specified days
        
        Args:
            days_to_keep: Number of days of data to keep
        """
        cutoff_date = date.today() - timedelta(days=days_to_keep)
        
        deleted_count = 0
        
        for filepath in self.list_all_files():
            # Extract date from filename
            try:
                # Format: market_data_YYYY-MM-DD.jsonl
                date_str = filepath.stem.replace('market_data_', '')
                file_date = datetime.strptime(date_str, '%Y-%m-%d').date()
                
                if file_date < cutoff_date:
                    filepath.unlink()
                    deleted_count += 1
                    logging.info(f"Deleted old file: {filepath.name}")
            
            except Exception as e:
                logging.error(f"Error processing {filepath.name}: {e}")
        
        logging.info(f"Cleanup complete. Deleted {deleted_count} files older than {days_to_keep} days")
        return deleted_count
    
    def close(self):
        """Close any open file handles"""
        with self.write_lock:
            if self.current_file_handle is not None:
                self.current_file_handle.close()
                self.current_file_handle = None
                logging.info("Closed storage file handle")
    
    def __del__(self):
        """Cleanup on deletion"""
        self.close()


# ============================================================================
# TEST STORAGE MANAGER
# ============================================================================

print("✅ StorageManager class defined")
print("\nTesting StorageManager...")

# Create test instance
test_storage = StorageManager('./test_market_data')

# Test 1: Store sample ticks
print("\n1. Storing sample ticks...")
sample_ticks = [
    {
        'tradable': True,
        'mode': 'quote',
        'instrument_token': 408065,
        'last_price': 2500.50,
        'last_traded_quantity': 10,
        'volume_traded': 100000,
        'ohlc': {'open': 2480.0, 'high': 2520.0, 'low': 2475.0, 'close': 2490.0}
    },
    {
        'tradable': True,
        'mode': 'quote',
        'instrument_token': 738561,
        'last_price': 1850.75,
        'last_traded_quantity': 5,
        'volume_traded': 50000,
        'ohlc': {'open': 1840.0, 'high': 1860.0, 'low': 1835.0, 'close': 1845.0}
    }
]

test_storage.store_ticks(sample_ticks, connection_id=1)
print(f"✅ Stored {len(sample_ticks)} ticks")

# Test 2: Check file creation
print("\n2. Checking file creation...")
current_file = test_storage.get_current_filepath()
if current_file.exists():
    print(f"✅ File created: {current_file}")
    print(f"   Tick count: {test_storage.count_ticks_today()}")
else:
    print(f"❌ File not created")

# Test 3: Read back ticks
print("\n3. Reading back ticks...")
read_ticks = test_storage.read_ticks_today(max_ticks=2)
if read_ticks:
    print(f"✅ Read {len(read_ticks)} ticks")
    print(f"   Sample tick: {json.dumps(read_ticks[0], indent=2)}")
else:
    print("❌ No ticks read")

# Test 4: Storage summary
print("\n4. Storage summary...")
summary = test_storage.get_storage_summary()
print(f"   Total files: {summary['total_files']}")
print(f"   Total ticks: {summary['total_ticks']}")
print(f"   Total size: {summary['total_size_mb']} MB")

# Test 5: File info
print("\n5. Current file info...")
file_info = test_storage.get_file_info(current_file)
print(f"   Filename: {file_info['filename']}")
print(f"   Size: {file_info['size_mb']:.4f} MB")
print(f"   Tick count: {file_info['tick_count']}")
print(f"   Created: {file_info['created']}")

# Cleanup
test_storage.close()

print("\n✅ StorageManager tested successfully")
print("\nKey Features:")
print("- Date-based file naming (market_data_YYYY-MM-DD.jsonl)")
print("- Automatic timestamp addition")
print("- Thread-safe atomic writes")
print("- File rotation at midnight")
print("- Connection tracking")
print("- Read/write utilities")
print("- Storage analytics")

## Section 7: Three-Connection Manager

### Architecture Overview
We'll implement a manager that handles 3 WebSocket connections:
- **Connection 1 (Primary)**: QUOTE mode - All trading universe stocks
- **Connection 2 (Secondary)**: FULL mode - Activated on demand for detailed data
- **Connection 3 (Tertiary)**: Redundancy - Failover connection

### Key Features
- Each connection runs in a separate thread
- Max 3000 instruments per connection enforcement
- Duplicate subscription validation across all connections
- Per-connection tick counters
- Auto-reconnection handled by KiteTicker

### Design Considerations
- Thread-safe operations using locks
- Connection state tracking
- Graceful degradation on connection failure

In [None]:
import threading
from typing import Dict, List, Set, Optional
from datetime import datetime
from collections import defaultdict

class MultiConnectionManager:
    """Manages 3 WebSocket connections with specific roles"""
    
    MAX_INSTRUMENTS_PER_CONNECTION = 3000
    
    def __init__(self, api_key: str, access_token: str, storage_manager):
        self.api_key = api_key
        self.access_token = access_token
        self.storage_manager = storage_manager
        
        # Connection instances
        self.connections: Dict[int, Optional[KiteTicker]] = {
            1: None,  # Primary - QUOTE mode
            2: None,  # Secondary - FULL mode
            3: None   # Tertiary - Redundancy
        }
        
        # Connection metadata
        self.connection_status: Dict[int, Dict] = {
            1: {'active': False, 'mode': 'QUOTE', 'role': 'Primary'},
            2: {'active': False, 'mode': 'FULL', 'role': 'Secondary'},
            3: {'active': False, 'mode': 'QUOTE', 'role': 'Tertiary'}
        }
        
        # Subscription tracking
        self.subscriptions: Dict[int, Set[int]] = {
            1: set(),
            2: set(),
            3: set()
        }
        
        # Performance metrics
        self.tick_counters: Dict[int, int] = {1: 0, 2: 0, 3: 0}
        self.last_tick_time: Dict[int, Optional[datetime]] = {1: None, 2: None, 3: None}
        self.start_time: Dict[int, Optional[datetime]] = {1: None, 2: None, 3: None}
        
        # Thread safety
        self.lock = threading.Lock()
        
        logging.info("MultiConnectionManager initialized")
    
    def _create_callbacks(self, connection_id: int):
        """Create callbacks for a specific connection"""
        
        def on_ticks(ws, ticks):
            """Handle incoming ticks"""
            with self.lock:
                self.tick_counters[connection_id] += len(ticks)
                self.last_tick_time[connection_id] = datetime.now()
            
            # Store ticks
            self.storage_manager.store_ticks(ticks, connection_id)
            
            logging.debug(f"Connection {connection_id}: Received {len(ticks)} ticks")
        
        def on_connect(ws, response):
            """Handle connection establishment"""
            with self.lock:
                self.connection_status[connection_id]['active'] = True
                self.start_time[connection_id] = datetime.now()
            
            logging.info(f"Connection {connection_id} established: {response}")
            
            # Resubscribe if there were previous subscriptions
            if self.subscriptions[connection_id]:
                tokens = list(self.subscriptions[connection_id])
                ws.subscribe(tokens)
                mode = getattr(ws, f"MODE_{self.connection_status[connection_id]['mode']}")
                ws.set_mode(mode, tokens)
                logging.info(f"Connection {connection_id}: Resubscribed to {len(tokens)} instruments")
        
        def on_close(ws, code, reason):
            """Handle connection closure"""
            with self.lock:
                self.connection_status[connection_id]['active'] = False
            
            logging.warning(f"Connection {connection_id} closed: {code} - {reason}")
        
        def on_error(ws, code, reason):
            """Handle errors"""
            logging.error(f"Connection {connection_id} error: {code} - {reason}")
        
        def on_reconnect(ws, attempts_count):
            """Handle reconnection attempts"""
            logging.info(f"Connection {connection_id} reconnecting (attempt {attempts_count})")
        
        def on_noreconnect(ws):
            """Handle reconnection failure"""
            logging.error(f"Connection {connection_id} failed to reconnect")
            with self.lock:
                self.connection_status[connection_id]['active'] = False
        
        return on_ticks, on_connect, on_close, on_error, on_reconnect, on_noreconnect
    
    def start_connection(self, connection_id: int) -> bool:
        """Start a specific connection"""
        try:
            if self.connections[connection_id] is not None:
                logging.warning(f"Connection {connection_id} already exists")
                return False
            
            # Create KiteTicker instance
            kws = KiteTicker(self.api_key, self.access_token)
            
            # Assign callbacks
            callbacks = self._create_callbacks(connection_id)
            kws.on_ticks = callbacks[0]
            kws.on_connect = callbacks[1]
            kws.on_close = callbacks[2]
            kws.on_error = callbacks[3]
            kws.on_reconnect = callbacks[4]
            kws.on_noreconnect = callbacks[5]
            
            # Store instance
            self.connections[connection_id] = kws
            
            # Connect in threaded mode
            kws.connect(threaded=True)
            
            logging.info(f"Connection {connection_id} started in threaded mode")
            return True
            
        except Exception as e:
            logging.error(f"Failed to start connection {connection_id}: {e}")
            return False
    
    def stop_connection(self, connection_id: int) -> bool:
        """Stop a specific connection"""
        try:
            kws = self.connections.get(connection_id)
            if kws is None:
                logging.warning(f"Connection {connection_id} does not exist")
                return False
            
            # Unsubscribe all instruments first
            if self.subscriptions[connection_id]:
                tokens = list(self.subscriptions[connection_id])
                kws.unsubscribe(tokens)
            
            # Close connection
            kws.close()
            
            # Clear instance
            self.connections[connection_id] = None
            
            with self.lock:
                self.connection_status[connection_id]['active'] = False
                self.subscriptions[connection_id].clear()
            
            logging.info(f"Connection {connection_id} stopped")
            return True
            
        except Exception as e:
            logging.error(f"Failed to stop connection {connection_id}: {e}")
            return False
    
    def validate_subscription(self, tokens: List[int]) -> Dict:
        """Validate subscription request for duplicates"""
        duplicates = []
        
        with self.lock:
            for token in tokens:
                for conn_id, subs in self.subscriptions.items():
                    if token in subs:
                        duplicates.append({
                            'token': token,
                            'existing_connection': conn_id
                        })
                        break
        
        return {
            'valid': len(duplicates) == 0,
            'duplicates': duplicates
        }
    
    def get_available_capacity(self, connection_id: int) -> int:
        """Get remaining capacity for a connection"""
        with self.lock:
            current = len(self.subscriptions[connection_id])
            return self.MAX_INSTRUMENTS_PER_CONNECTION - current
    
    def subscribe_instruments(self, connection_id: int, tokens: List[int], mode: str = 'QUOTE') -> Dict:
        """Subscribe instruments to a specific connection"""
        try:
            # Validate connection
            kws = self.connections.get(connection_id)
            if kws is None or not kws.is_connected():
                return {
                    'success': False,
                    'error': f'Connection {connection_id} not available'
                }
            
            # Check capacity
            capacity = self.get_available_capacity(connection_id)
            if len(tokens) > capacity:
                return {
                    'success': False,
                    'error': f'Insufficient capacity. Available: {capacity}, Requested: {len(tokens)}'
                }
            
            # Validate for duplicates
            validation = self.validate_subscription(tokens)
            if not validation['valid']:
                return {
                    'success': False,
                    'error': 'Duplicate subscriptions found',
                    'duplicates': validation['duplicates']
                }
            
            # Subscribe
            kws.subscribe(tokens)
            
            # Set mode
            mode_const = getattr(kws, f'MODE_{mode}')
            kws.set_mode(mode_const, tokens)
            
            # Update tracking
            with self.lock:
                self.subscriptions[connection_id].update(tokens)
                self.connection_status[connection_id]['mode'] = mode
            
            logging.info(f"Connection {connection_id}: Subscribed {len(tokens)} instruments in {mode} mode")
            
            return {
                'success': True,
                'connection_id': connection_id,
                'subscribed_count': len(tokens),
                'mode': mode
            }
            
        except Exception as e:
            logging.error(f"Subscription failed on connection {connection_id}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def unsubscribe_instruments(self, tokens: List[int]) -> Dict:
        """Unsubscribe instruments from all connections"""
        results = []
        
        for conn_id, kws in self.connections.items():
            if kws is None or not kws.is_connected():
                continue
            
            # Find tokens subscribed to this connection
            with self.lock:
                to_unsubscribe = [t for t in tokens if t in self.subscriptions[conn_id]]
            
            if to_unsubscribe:
                try:
                    kws.unsubscribe(to_unsubscribe)
                    
                    with self.lock:
                        self.subscriptions[conn_id].difference_update(to_unsubscribe)
                    
                    results.append({
                        'connection_id': conn_id,
                        'unsubscribed_count': len(to_unsubscribe)
                    })
                    
                    logging.info(f"Connection {conn_id}: Unsubscribed {len(to_unsubscribe)} instruments")
                    
                except Exception as e:
                    logging.error(f"Unsubscription failed on connection {conn_id}: {e}")
        
        return {
            'success': len(results) > 0,
            'results': results
        }
    
    def unsubscribe_all(self) -> Dict:
        """Unsubscribe all instruments from all connections"""
        results = []
        
        for conn_id, kws in self.connections.items():
            if kws is None or not kws.is_connected():
                continue
            
            with self.lock:
                tokens = list(self.subscriptions[conn_id])
            
            if tokens:
                try:
                    kws.unsubscribe(tokens)
                    
                    with self.lock:
                        self.subscriptions[conn_id].clear()
                    
                    results.append({
                        'connection_id': conn_id,
                        'unsubscribed_count': len(tokens)
                    })
                    
                    logging.info(f"Connection {conn_id}: Unsubscribed all {len(tokens)} instruments")
                    
                except Exception as e:
                    logging.error(f"Unsubscribe all failed on connection {conn_id}: {e}")
        
        return {
            'success': len(results) > 0,
            'results': results
        }
    
    def get_status(self) -> Dict:
        """Get status of all connections"""
        with self.lock:
            status = {}
            for conn_id in [1, 2, 3]:
                kws = self.connections.get(conn_id)
                status[conn_id] = {
                    'role': self.connection_status[conn_id]['role'],
                    'mode': self.connection_status[conn_id]['mode'],
                    'active': kws.is_connected() if kws else False,
                    'subscribed_instruments': len(self.subscriptions[conn_id]),
                    'tick_count': self.tick_counters[conn_id],
                    'last_tick_time': self.last_tick_time[conn_id].isoformat() if self.last_tick_time[conn_id] else None,
                    'start_time': self.start_time[conn_id].isoformat() if self.start_time[conn_id] else None,
                    'capacity_remaining': self.get_available_capacity(conn_id)
                }
            return status

# Test the manager
print("✅ MultiConnectionManager class defined")
print("Key features:")
print("- 3 WebSocket connections with specific roles")
print("- Duplicate subscription validation")
print("- Per-connection capacity management (3000 max)")
print("- Thread-safe operations")
print("- Comprehensive status tracking")

## Section 8: Subscription Management

### Subscription Strategy
- **Validation**: Check for duplicates across all connections before subscribing
- **Routing**: Direct instruments to appropriate connection based on mode
- **Capacity**: Enforce 3000 instrument limit per connection
- **Bulk Operations**: Support batch subscribe/unsubscribe

### Mode Selection
- **LTP**: Last Traded Price only (minimal data)
- **QUOTE**: LTP + OHLC + Volume (recommended for most use cases)
- **FULL**: Complete tick data with order book depth

### Best Practices
- Subscribe in batches of 100-500 instruments
- Use QUOTE mode for primary connection
- Activate FULL mode only when needed
- Always validate before subscribing

In [None]:
class SubscriptionManager:
    """High-level subscription management with validation and routing"""
    
    def __init__(self, connection_manager: MultiConnectionManager):
        self.conn_mgr = connection_manager
        self.subscription_history = []
    
    def subscribe_to_primary(self, tokens: List[int]) -> Dict:
        """Subscribe instruments to primary connection (Connection 1) in QUOTE mode"""
        logging.info(f"Subscribing {len(tokens)} instruments to primary connection")
        
        result = self.conn_mgr.subscribe_instruments(
            connection_id=1,
            tokens=tokens,
            mode='QUOTE'
        )
        
        if result['success']:
            self.subscription_history.append({
                'timestamp': datetime.now().isoformat(),
                'action': 'subscribe',
                'connection': 1,
                'count': len(tokens),
                'mode': 'QUOTE'
            })
        
        return result
    
    def subscribe_to_secondary(self, tokens: List[int]) -> Dict:
        """Subscribe instruments to secondary connection (Connection 2) in FULL mode"""
        logging.info(f"Subscribing {len(tokens)} instruments to secondary connection")
        
        # Ensure connection 2 is started
        if self.conn_mgr.connections[2] is None:
            logging.info("Starting secondary connection...")
            self.conn_mgr.start_connection(2)
            time.sleep(2)  # Wait for connection
        
        result = self.conn_mgr.subscribe_instruments(
            connection_id=2,
            tokens=tokens,
            mode='FULL'
        )
        
        if result['success']:
            self.subscription_history.append({
                'timestamp': datetime.now().isoformat(),
                'action': 'subscribe',
                'connection': 2,
                'count': len(tokens),
                'mode': 'FULL'
            })
        
        return result
    
    def activate_tertiary_connection(self) -> bool:
        """Activate tertiary connection for redundancy"""
        logging.info("Activating tertiary connection...")
        
        # Start connection 3
        if self.conn_mgr.connections[3] is None:
            success = self.conn_mgr.start_connection(3)
            if success:
                time.sleep(2)  # Wait for connection
                
                # Subscribe same instruments as primary connection
                with self.conn_mgr.lock:
                    primary_tokens = list(self.conn_mgr.subscriptions[1])
                
                if primary_tokens:
                    result = self.conn_mgr.subscribe_instruments(
                        connection_id=3,
                        tokens=primary_tokens,
                        mode='QUOTE'
                    )
                    return result['success']
            
            return success
        
        return True
    
    def batch_subscribe(self, tokens: List[int], connection_id: int = 1, 
                       batch_size: int = 500, mode: str = 'QUOTE') -> Dict:
        """Subscribe instruments in batches"""
        results = []
        total_tokens = len(tokens)
        
        logging.info(f"Batch subscribing {total_tokens} instruments (batch size: {batch_size})")
        
        for i in range(0, total_tokens, batch_size):
            batch = tokens[i:i + batch_size]
            
            result = self.conn_mgr.subscribe_instruments(
                connection_id=connection_id,
                tokens=batch,
                mode=mode
            )
            
            results.append(result)
            
            if not result['success']:
                logging.warning(f"Batch {i//batch_size + 1} failed: {result.get('error')}")
            
            time.sleep(0.5)  # Brief pause between batches
        
        successful = sum(1 for r in results if r['success'])
        
        return {
            'total_batches': len(results),
            'successful_batches': successful,
            'failed_batches': len(results) - successful,
            'results': results
        }
    
    def get_subscription_summary(self) -> Dict:
        """Get comprehensive subscription summary"""
        with self.conn_mgr.lock:
            total_subscriptions = sum(len(subs) for subs in self.conn_mgr.subscriptions.values())
            
            return {
                'total_subscriptions': total_subscriptions,
                'by_connection': {
                    conn_id: {
                        'count': len(subs),
                        'capacity_used_pct': (len(subs) / self.conn_mgr.MAX_INSTRUMENTS_PER_CONNECTION) * 100,
                        'capacity_remaining': self.conn_mgr.get_available_capacity(conn_id)
                    }
                    for conn_id, subs in self.conn_mgr.subscriptions.items()
                },
                'history_count': len(self.subscription_history)
            }
    
    def validate_and_subscribe(self, tokens: List[int], connection_id: int = 1, 
                              mode: str = 'QUOTE') -> Dict:
        """Validate before subscribing"""
        # Check for duplicates
        validation = self.conn_mgr.validate_subscription(tokens)
        
        if not validation['valid']:
            return {
                'success': False,
                'error': 'Validation failed',
                'duplicates': validation['duplicates']
            }
        
        # Check capacity
        capacity = self.conn_mgr.get_available_capacity(connection_id)
        if len(tokens) > capacity:
            return {
                'success': False,
                'error': f'Insufficient capacity. Need {len(tokens)}, have {capacity}'
            }
        
        # Subscribe
        return self.conn_mgr.subscribe_instruments(connection_id, tokens, mode)

# Test subscription manager
print("✅ SubscriptionManager class defined")
print("Features:")
print("- Batch subscription with configurable size")
print("- Automatic validation before subscribing")
print("- Connection routing (primary/secondary/tertiary)")
print("- Subscription history tracking")
print("- Comprehensive summary reporting")

## Section 9: Status Monitoring & Endpoints

### Simulated REST Endpoints
We'll implement functions that simulate REST API endpoints (without actual FastAPI server):
- `/health` - Overall service health
- `/connections` - Per-connection statistics
- `/instruments` - Subscription details

### Monitoring Metrics
- **Uptime**: Service running duration
- **Tick Rate**: Ticks processed per second
- **Connection Health**: Active connections and their status
- **Data Freshness**: Time since last tick received

### Output Format
All endpoints return Python dictionaries that would be JSON in a real API.

In [None]:
from datetime import datetime, timedelta

class MonitoringEndpoints:
    """Simulated REST API endpoints for monitoring"""
    
    def __init__(self, connection_manager: MultiConnectionManager, 
                 subscription_manager: SubscriptionManager,
                 storage_manager):
        self.conn_mgr = connection_manager
        self.sub_mgr = subscription_manager
        self.storage_mgr = storage_manager
        self.service_start_time = datetime.now()
    
    def health(self) -> Dict:
        """
        GET /health
        Returns overall service health status
        """
        now = datetime.now()
        uptime_seconds = (now - self.service_start_time).total_seconds()
        
        # Get active connections
        active_connections = sum(
            1 for kws in self.conn_mgr.connections.values() 
            if kws and kws.is_connected()
        )
        
        # Get total subscriptions
        with self.conn_mgr.lock:
            total_subscriptions = sum(
                len(subs) for subs in self.conn_mgr.subscriptions.values()
            )
        
        # Get last tick time across all connections
        last_tick_times = [
            t for t in self.conn_mgr.last_tick_time.values() if t is not None
        ]
        last_tick_time = max(last_tick_times) if last_tick_times else None
        
        # Calculate data freshness
        data_fresh = False
        if last_tick_time:
            seconds_since_last_tick = (now - last_tick_time).total_seconds()
            data_fresh = seconds_since_last_tick < 5  # Fresh if within 5 seconds
        
        return {
            'status': 'healthy' if active_connections > 0 and data_fresh else 'degraded',
            'timestamp': now.isoformat(),
            'uptime_seconds': round(uptime_seconds, 2),
            'uptime_human': str(timedelta(seconds=int(uptime_seconds))),
            'active_connections': active_connections,
            'total_connections': 3,
            'subscribed_instruments': total_subscriptions,
            'kite_connected': active_connections > 0,
            'last_tick_time': last_tick_time.isoformat() if last_tick_time else None,
            'data_fresh': data_fresh,
            'service_start_time': self.service_start_time.isoformat()
        }
    
    def connections(self) -> Dict:
        """
        GET /connections
        Returns detailed status of each WebSocket connection
        """
        status = self.conn_mgr.get_status()
        
        # Enhance with calculated metrics
        for conn_id, info in status.items():
            # Calculate uptime for this connection
            if info['start_time']:
                start = datetime.fromisoformat(info['start_time'])
                uptime = (datetime.now() - start).total_seconds()
                info['uptime_seconds'] = round(uptime, 2)
            else:
                info['uptime_seconds'] = 0
            
            # Calculate tick rate (ticks per second)
            if info['uptime_seconds'] > 0:
                info['tick_rate'] = round(info['tick_count'] / info['uptime_seconds'], 2)
            else:
                info['tick_rate'] = 0
            
            # Data freshness
            if info['last_tick_time']:
                last_tick = datetime.fromisoformat(info['last_tick_time'])
                seconds_ago = (datetime.now() - last_tick).total_seconds()
                info['seconds_since_last_tick'] = round(seconds_ago, 2)
            else:
                info['seconds_since_last_tick'] = None
        
        return {
            'timestamp': datetime.now().isoformat(),
            'connections': status
        }
    
    def instruments(self) -> Dict:
        """
        GET /instruments
        Returns subscription details for all instruments
        """
        with self.conn_mgr.lock:
            instruments_by_connection = {}
            all_instruments = set()
            
            for conn_id, tokens in self.conn_mgr.subscriptions.items():
                instruments_by_connection[conn_id] = {
                    'count': len(tokens),
                    'tokens': list(tokens),
                    'mode': self.conn_mgr.connection_status[conn_id]['mode']
                }
                all_instruments.update(tokens)
        
        # Get subscription summary
        summary = self.sub_mgr.get_subscription_summary()
        
        return {
            'timestamp': datetime.now().isoformat(),
            'total_unique_instruments': len(all_instruments),
            'by_connection': instruments_by_connection,
            'capacity_summary': summary['by_connection'],
            'subscription_history_count': summary['history_count']
        }
    
    def storage(self) -> Dict:
        """
        GET /storage
        Returns storage status (JSONL files)
        """
        return {
            'timestamp': datetime.now().isoformat(),
            'storage_type': 'JSONL',
            'current_file': self.storage_mgr.get_current_filename(),
            'total_files': self.storage_mgr.count_files(),
            'total_ticks_today': self.storage_mgr.count_ticks_today()
        }
    
    def metrics(self) -> Dict:
        """
        GET /metrics
        Returns aggregated performance metrics
        """
        total_ticks = sum(self.conn_mgr.tick_counters.values())
        uptime = (datetime.now() - self.service_start_time).total_seconds()
        
        avg_tick_rate = round(total_ticks / uptime, 2) if uptime > 0 else 0
        
        return {
            'timestamp': datetime.now().isoformat(),
            'total_ticks_received': total_ticks,
            'uptime_seconds': round(uptime, 2),
            'average_tick_rate': avg_tick_rate,
            'ticks_by_connection': self.conn_mgr.tick_counters.copy(),
            'target_tick_rate': 1000,  # From requirements
            'performance_pct': round((avg_tick_rate / 1000) * 100, 2)
        }

# Helper function to pretty print endpoint responses
def pretty_print_endpoint(endpoint_name: str, response: Dict):
    """Pretty print endpoint response"""
    print(f"\n{'='*60}")
    print(f"Endpoint: {endpoint_name}")
    print('='*60)
    print(json.dumps(response, indent=2, default=str))
    print('='*60)

print("✅ MonitoringEndpoints class defined")
print("Available endpoints:")
print("- health() - Overall service status")
print("- connections() - Per-connection details")
print("- instruments() - Subscription information")
print("- storage() - Storage status")
print("- metrics() - Performance metrics")

## Section 10: Complete Service Integration

### MarketDataService Class
This is the main service class that integrates all components:
- MultiConnectionManager (WebSocket handling)
- SubscriptionManager (Subscription logic)
- StorageManager (JSONL persistence)
- MonitoringEndpoints (Status APIs)

### Service Lifecycle
1. **Initialization**: Setup all managers
2. **Start**: Launch primary connection
3. **Subscribe**: Add instruments to connections
4. **Monitor**: Track health and performance
5. **Stop**: Graceful shutdown

### Usage Pattern
```python
service = MarketDataService(api_key, access_token)
service.start()
service.subscribe_trading_universe(tokens)
# ... service runs ...
service.stop()

In [None]:
class MarketDataService:
    """Main service class integrating all components"""
    
    def __init__(self, api_key: str, access_token: str, data_dir: str = './market_data'):
        """
        Initialize the market data service
        
        Args:
            api_key: Kite API key
            access_token: Kite access token
            data_dir: Directory for JSONL files
        """
        self.api_key = api_key
        self.access_token = access_token
        self.data_dir = data_dir
        
        # Initialize components
        self.storage_mgr = StorageManager(data_dir)
        self.conn_mgr = MultiConnectionManager(api_key, access_token, self.storage_mgr)
        self.sub_mgr = SubscriptionManager(self.conn_mgr)
        self.monitor = MonitoringEndpoints(self.conn_mgr, self.sub_mgr, self.storage_mgr)
        
        self.running = False
        
        logging.info("MarketDataService initialized")
    
    def start(self, activate_tertiary: bool = False) -> bool:
        """
        Start the market data service
        
        Args:
            activate_tertiary: Whether to start tertiary connection immediately
        
        Returns:
            bool: Success status
        """
        try:
            logging.info("Starting MarketDataService...")
            
            # Start primary connection (Connection 1)
            success = self.conn_mgr.start_connection(1)
            if not success:
                logging.error("Failed to start primary connection")
                return False
            
            time.sleep(2)  # Wait for connection to establish
            
            # Optionally start tertiary connection
            if activate_tertiary:
                self.conn_mgr.start_connection(3)
                time.sleep(2)
            
            self.running = True
            logging.info("✅ MarketDataService started successfully")
            return True
            
        except Exception as e:
            logging.error(f"Failed to start service: {e}")
            return False
    
    def stop(self) -> bool:
        """
        Stop the market data service gracefully
        
        Returns:
            bool: Success status
        """
        try:
            logging.info("Stopping MarketDataService...")
            
            # Unsubscribe all instruments
            self.conn_mgr.unsubscribe_all()
            
            # Stop all connections
            for conn_id in [1, 2, 3]:
                if self.conn_mgr.connections[conn_id]:
                    self.conn_mgr.stop_connection(conn_id)
            
            self.running = False
            logging.info("✅ MarketDataService stopped successfully")
            return True
            
        except Exception as e:
            logging.error(f"Failed to stop service: {e}")
            return False
    
    def subscribe_trading_universe(self, tokens: List[int], batch_size: int = 500) -> Dict:
        """
        Subscribe to trading universe instruments on primary connection
        
        Args:
            tokens: List of instrument tokens
            batch_size: Number of instruments per batch
        
        Returns:
            dict: Subscription result
        """
        logging.info(f"Subscribing to trading universe: {len(tokens)} instruments")
        
        return self.sub_mgr.batch_subscribe(
            tokens=tokens,
            connection_id=1,
            batch_size=batch_size,
            mode='QUOTE'
        )
    
    def activate_full_mode(self, tokens: List[int]) -> Dict:
        """
        Activate full mode for specific instruments on secondary connection
        
        Args:
            tokens: List of instrument tokens
        
        Returns:
            dict: Subscription result
        """
        logging.info(f"Activating FULL mode for {len(tokens)} instruments")
        
        return self.sub_mgr.subscribe_to_secondary(tokens)
    
    def subscribe_instruments(self, tokens: List[int], mode: str = 'QUOTE', 
                            connection_id: int = 1) -> Dict:
        """
        Subscribe instruments with validation
        
        Args:
            tokens: List of instrument tokens
            mode: Subscription mode (LTP, QUOTE, FULL)
            connection_id: Target connection (1, 2, or 3)
        
        Returns:
            dict: Subscription result
        """
        return self.sub_mgr.validate_and_subscribe(tokens, connection_id, mode)
    
    def unsubscribe_instruments(self, tokens: List[int]) -> Dict:
        """
        Unsubscribe instruments from all connections
        
        Args:
            tokens: List of instrument tokens
        
        Returns:
            dict: Unsubscription result
        """
        return self.conn_mgr.unsubscribe_instruments(tokens)
    
    def unsubscribe_all(self) -> Dict:
        """
        Unsubscribe all instruments from all connections
        
        Returns:
            dict: Unsubscription result
        """
        return self.conn_mgr.unsubscribe_all()
    
    def get_health(self) -> Dict:
        """Get service health status"""
        return self.monitor.health()
    
    def get_connections(self) -> Dict:
        """Get connection status"""
        return self.monitor.connections()
    
    def get_instruments(self) -> Dict:
        """Get instrument subscription details"""
        return self.monitor.instruments()
    
    def get_storage_status(self) -> Dict:
        """Get storage status"""
        return self.monitor.storage()
    
    def get_metrics(self) -> Dict:
        """Get performance metrics"""
        return self.monitor.metrics()
    
    def activate_redundancy(self) -> bool:
        """
        Activate tertiary connection for redundancy
        
        Returns:
            bool: Success status
        """
        return self.sub_mgr.activate_tertiary_connection()
    
    def get_status_summary(self) -> Dict:
        """
        Get comprehensive status summary
        
        Returns:
            dict: Complete status information
        """
        return {
            'health': self.get_health(),
            'connections': self.get_connections(),
            'instruments': self.get_instruments(),
            'storage': self.get_storage_status(),
            'metrics': self.get_metrics()
        }
    
    def wait_for_ticks(self, duration_seconds: int = 10, expected_min_ticks: int = 10):
        """
        Wait and verify tick reception
        
        Args:
            duration_seconds: How long to wait
            expected_min_ticks: Minimum ticks expected
        """
        logging.info(f"Waiting {duration_seconds} seconds for ticks...")
        
        initial_ticks = sum(self.conn_mgr.tick_counters.values())
        time.sleep(duration_seconds)
        final_ticks = sum(self.conn_mgr.tick_counters.values())
        
        received = final_ticks - initial_ticks
        logging.info(f"Received {received} ticks in {duration_seconds} seconds")
        
        if received >= expected_min_ticks:
            logging.info("✅ Tick reception verified")
        else:
            logging.warning(f"⚠️ Expected at least {expected_min_ticks} ticks, got {received}")
        
        return received

print("✅ MarketDataService class defined")
print("\nKey methods:")
print("- start() / stop() - Service lifecycle")
print("- subscribe_trading_universe() - Subscribe multiple instruments")
print("- activate_full_mode() - Enable FULL mode on secondary connection")
print("- get_health() / get_connections() / get_instruments() - Monitoring")
print("- activate_redundancy() - Start tertiary connection")
print("- wait_for_ticks() - Verify tick reception")

## Section 11: Real-World Testing

### Testing Strategy
1. **Connection Test**: Verify all 3 connections can start
2. **Subscription Test**: Subscribe to real instrument tokens
3. **Tick Reception**: Verify data is flowing
4. **Storage Test**: Check JSONL files are created
5. **Failover Test**: Simulate connection failure
6. **Performance Test**: Verify tick processing rate

### Sample Instrument Tokens
We'll use tokens from the example files:
- 408065, 738561, 408577 (from ticker.py)
- 81153, 119553, 70401, 415745, etc. (from requirements)

### Success Criteria (from requirements)
- ✅ Process 1000 ticks/second without data loss
- ✅ Handle network disconnections gracefully
- ✅ Store data reliably in JSONL files
- ✅ Manage subscriptions across 3 connections

In [None]:
# IMPORTANT: Set your credentials before running tests
# Replace with your actual API key and access token
TEST_API_KEY = "your_api_key_here"
TEST_ACCESS_TOKEN = "your_access_token_here"

# Sample instrument tokens from examples
SAMPLE_TOKENS = [
    408065,   # ACC
    738561,   # Example token
    408577,   # Another example
    81153,    # From requirements sample
    119553,   # From requirements sample
    70401,    # From requirements sample
    415745,   # From requirements sample
    3001089,  # From requirements sample
    1346049,  # From requirements sample
    175361,   # From requirements sample
    969473,   # From requirements sample
    1041153,  # From requirements sample
    1270529   # From requirements sample
]

def test_basic_connection():
    """Test 1: Basic connection establishment"""
    print("\n" + "="*60)
    print("TEST 1: Basic Connection Establishment")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        
        # Start service
        success = service.start()
        if not success:
            print("❌ Failed to start service")
            return False
        
        print("✅ Service started successfully")
        
        # Wait a moment
        time.sleep(3)
        
        # Check health
        health = service.get_health()
        print(f"\nHealth Status: {health['status']}")
        print(f"Active Connections: {health['active_connections']}")
        print(f"Kite Connected: {health['kite_connected']}")
        
        # Stop service
        service.stop()
        print("✅ Service stopped successfully")
        
        return True
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def test_subscription_and_ticks():
    """Test 2: Subscription and tick reception"""
    print("\n" + "="*60)
    print("TEST 2: Subscription and Tick Reception")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        service.start()
        time.sleep(3)
        
        # Subscribe to sample instruments
        print(f"\nSubscribing to {len(SAMPLE_TOKENS)} instruments...")
        result = service.subscribe_instruments(SAMPLE_TOKENS, mode='QUOTE', connection_id=1)
        
        if result['success']:
            print(f"✅ Subscribed successfully")
        else:
            print(f"❌ Subscription failed: {result.get('error')}")
            service.stop()
            return False
        
        # Wait for ticks
        print("\nWaiting 15 seconds for ticks...")
        received_ticks = service.wait_for_ticks(duration_seconds=15, expected_min_ticks=10)
        
        # Check instruments endpoint
        instruments = service.get_instruments()
        print(f"\nTotal subscribed instruments: {instruments['total_unique_instruments']}")
        
        # Check connections
        connections = service.get_connections()
        conn1 = connections['connections']['1']
        print(f"Connection 1 tick count: {conn1['tick_count']}")
        print(f"Connection 1 tick rate: {conn1['tick_rate']} ticks/sec")
        
        # Stop service
        service.stop()
        
        if received_ticks >= 10:
            print("\n✅ Test passed")
            return True
        else:
            print("\n⚠️ Test passed but fewer ticks than expected")
            return True
            
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def test_multiple_connections():
    """Test 3: Multiple connection management"""
    print("\n" + "="*60)
    print("TEST 3: Multiple Connection Management")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        service.start()
        time.sleep(3)
        
        # Subscribe to primary connection (QUOTE mode)
        print("\n1. Subscribing to primary connection (QUOTE mode)...")
        primary_tokens = SAMPLE_TOKENS[:5]
        result1 = service.subscribe_instruments(primary_tokens, mode='QUOTE', connection_id=1)
        print(f"   {'✅' if result1['success'] else '❌'} Primary subscription")
        
        # Activate full mode on secondary connection
        print("\n2. Activating FULL mode on secondary connection...")
        full_tokens = SAMPLE_TOKENS[5:8]
        result2 = service.activate_full_mode(full_tokens)
        print(f"   {'✅' if result2['success'] else '❌'} FULL mode activation")
        
        # Wait for ticks from both connections
        print("\n3. Waiting for ticks from both connections...")
        time.sleep(10)
        
        # Check connection status
        connections = service.get_connections()
        print("\nConnection Status:")
        for conn_id in [1, 2]:
            conn = connections['connections'][str(conn_id)]
            print(f"  Connection {conn_id}:")
            print(f"    - Active: {conn['active']}")
            print(f"    - Mode: {conn['mode']}")
            print(f"    - Instruments: {conn['subscribed_instruments']}")
            print(f"    - Tick Count: {conn['tick_count']}")
        
        # Stop service
        service.stop()
        print("\n✅ Test completed")
        return True
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def test_duplicate_validation():
    """Test 4: Duplicate subscription validation"""
    print("\n" + "="*60)
    print("TEST 4: Duplicate Subscription Validation")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        service.start()
        time.sleep(3)
        
        # Subscribe tokens to connection 1
        tokens = SAMPLE_TOKENS[:3]
        print(f"\n1. Subscribing {tokens} to Connection 1...")
        result1 = service.subscribe_instruments(tokens, connection_id=1)
        print(f"   {'✅' if result1['success'] else '❌'} First subscription")
        
        # Try to subscribe same tokens to connection 2 (should fail)
        print(f"\n2. Attempting to subscribe same tokens to Connection 2...")
        result2 = service.subscribe_instruments(tokens, connection_id=2)
        
        if not result2['success']:
            print(f"   ✅ Duplicate validation working: {result2.get('error')}")
            if 'duplicates' in result2:
                print(f"   Duplicates found: {len(result2['duplicates'])}")
        else:
            print(f"   ❌ Duplicate validation failed - subscription should have been rejected")
        
        # Stop service
        service.stop()
        print("\n✅ Test completed")
        return not result2['success']  # Success if second subscription failed
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def test_unsubscribe():
    """Test 5: Unsubscription functionality"""
    print("\n" + "="*60)
    print("TEST 5: Unsubscription Functionality")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        service.start()
        time.sleep(3)
        
        # Subscribe tokens
        tokens = SAMPLE_TOKENS[:5]
        print(f"\n1. Subscribing {len(tokens)} instruments...")
        service.subscribe_instruments(tokens, connection_id=1)
        time.sleep(2)
        
        instruments_before = service.get_instruments()
        print(f"   Instruments subscribed: {instruments_before['total_unique_instruments']}")
        
        # Unsubscribe some tokens
        to_unsubscribe = tokens[:2]
        print(f"\n2. Unsubscribing {len(to_unsubscribe)} instruments...")
        result = service.unsubscribe_instruments(to_unsubscribe)
        print(f"   {'✅' if result['success'] else '❌'} Unsubscription")
        
        time.sleep(2)
        
        instruments_after = service.get_instruments()
        print(f"   Instruments remaining: {instruments_after['total_unique_instruments']}")
        
        # Unsubscribe all
        print(f"\n3. Unsubscribing all remaining instruments...")
        result_all = service.unsubscribe_all()
        print(f"   {'✅' if result_all['success'] else '❌'} Unsubscribe all")
        
        time.sleep(2)
        
        instruments_final = service.get_instruments()
        print(f"   Instruments remaining: {instruments_final['total_unique_instruments']}")
        
        # Stop service
        service.stop()
        print("\n✅ Test completed")
        return True
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def test_storage():
    """Test 6: JSONL storage verification"""
    print("\n" + "="*60)
    print("TEST 6: JSONL Storage Verification")
    print("="*60)
    
    try:
        service = MarketDataService(TEST_API_KEY, TEST_ACCESS_TOKEN)
        service.start()
        time.sleep(3)
        
        # Subscribe and collect ticks
        print("\n1. Subscribing and collecting ticks...")
        service.subscribe_instruments(SAMPLE_TOKENS[:5], connection_id=1)
        time.sleep(10)
        
        # Check storage status
        storage = service.get_storage_status()
        print(f"\n2. Storage Status:")
        print(f"   Current file: {storage['current_file']}")
        print(f"   Total files: {storage['total_files']}")
        print(f"   Ticks today: {storage['total_ticks_today']}")
        
        # Verify file exists
        current_file = service.storage_mgr.get_current_filepath()
        if os.path.exists(current_file):
            print(f"\n3. ✅ JSONL file exists: {current_file}")
            
            # Read and display sample
            with open(current_file, 'r') as f:
                lines = f.readlines()
                print(f"   Total lines in file: {len(lines)}")
                
                if lines:
                    print(f"\n4. Sample tick data:")
                    sample = json.loads(lines[0])
                    print(json.dumps(sample, indent=2))
        else:
            print(f"\n3. ❌ JSONL file not found: {current_file}")
        
        # Stop service
        service.stop()
        print("\n✅ Test completed")
        return True
        
    except Exception as e:
        print(f"❌ Test failed: {e}")
        return False

def run_all_tests():
    """Run all tests sequentially"""
    print("\n" + "#"*60)
    print("# RUNNING ALL TESTS")
    print("#"*60)
    
    tests = [
        ("Basic Connection", test_basic_connection),
        ("Subscription & Ticks", test_subscription_and_ticks),
        ("Multiple Connections", test_multiple_connections),
        ("Duplicate Validation", test_duplicate_validation),
        ("Unsubscription", test_unsubscribe),
        ("Storage Verification", test_storage)
    ]
    
    results = []
    
    for test_name, test_func in tests:
        print(f"\n\n{'='*60}")
        print(f"Running: {test_name}")
        print('='*60)
        
        try:
            result = test_func()
            results.append((test_name, result))
            time.sleep(2)  # Pause between tests
        except Exception as e:
            print(f"❌ Test '{test_name}' crashed: {e}")
            results.append((test_name, False))
    
    # Summary
    print("\n\n" + "#"*60)
    print("# TEST SUMMARY")
    print("#"*60)
    
    for test_name, result in results:
        status = "✅ PASSED" if result else "❌ FAILED"
        print(f"{status}: {test_name}")
    
    passed = sum(1 for _, result in results if result)
    total = len(results)
    print(f"\nTotal: {passed}/{total} tests passed")
    
    return passed == total

print("✅ Test functions defined")
print("\nAvailable tests:")
print("- test_basic_connection()")
print("- test_subscription_and_ticks()")
print("- test_multiple_connections()")
print("- test_duplicate_validation()")
print("- test_unsubscribe()")
print("- test_storage()")
print("- run_all_tests()  # Run everything")
print("\n⚠️ Remember to set TEST_API_KEY and TEST_ACCESS_TOKEN before running!")

## Section 12: Usage Examples & Utilities

### Common Usage Patterns
This section provides:
- Complete usage examples
- JSONL data analysis utilities
- Health monitoring loops
- Graceful shutdown patterns
- Troubleshooting helpers

### Production Checklist
- [ ] Set valid API credentials
- [ ] Configure data directory
- [ ] Test with small instrument set first
- [ ] Monitor tick reception rate
- [ ] Set up log rotation
- [ ] Plan for file cleanup/archival

In [None]:
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================

def read_jsonl_to_dataframe(filepath: str, max_rows: Optional[int] = None) -> pd.DataFrame:
    """
    Read JSONL file into pandas DataFrame
    
    Args:
        filepath: Path to JSONL file
        max_rows: Maximum rows to read (None for all)
    
    Returns:
        DataFrame with tick data
    """
    data = []
    
    with open(filepath, 'r') as f:
        for i, line in enumerate(f):
            if max_rows and i >= max_rows:
                break
            data.append(json.loads(line))
    
    df = pd.DataFrame(data)
    
    # Convert timestamp to datetime
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    return df

def analyze_tick_data(df: pd.DataFrame) -> Dict:
    """
    Analyze tick data from DataFrame
    
    Args:
        df: DataFrame with tick data
    
    Returns:
        dict: Analysis results
    """
    analysis = {
        'total_ticks': len(df),
        'unique_instruments': df['instrument_token'].nunique() if 'instrument_token' in df.columns else 0,
        'time_range': {
            'start': df['timestamp'].min().isoformat() if 'timestamp' in df.columns else None,
            'end': df['timestamp'].max().isoformat() if 'timestamp' in df.columns else None
        },
        'modes': df['mode'].value_counts().to_dict() if 'mode' in df.columns else {},
    }
    
    # Price statistics
    if 'last_price' in df.columns:
        analysis['price_stats'] = {
            'min': float(df['last_price'].min()),
            'max': float(df['last_price'].max()),
            'mean': float(df['last_price'].mean())
        }
    
    # Volume statistics
    if 'volume_traded' in df.columns:
        analysis['volume_stats'] = {
            'total': int(df['volume_traded'].sum()),
            'mean': float(df['volume_traded'].mean())
        }
    
    return analysis

def monitor_service_health(service: MarketDataService, duration_seconds: int = 60, 
                          interval_seconds: int = 5):
    """
    Monitor service health in a loop
    
    Args:
        service: MarketDataService instance
        duration_seconds: How long to monitor
        interval_seconds: Check interval
    """
    print(f"\n{'='*60}")
    print(f"Monitoring service health for {duration_seconds} seconds")
    print(f"Check interval: {interval_seconds} seconds")
    print('='*60)
    
    end_time = time.time() + duration_seconds
    iteration = 0
    
    while time.time() < end_time:
        iteration += 1
        print(f"\n--- Check #{iteration} ---")
        
        health = service.get_health()
        metrics = service.get_metrics()
        
        print(f"Status: {health['status']}")
        print(f"Active Connections: {health['active_connections']}/3")
        print(f"Subscribed Instruments: {health['subscribed_instruments']}")
        print(f"Total Ticks: {metrics['total_ticks_received']}")
        print(f"Avg Tick Rate: {metrics['average_tick_rate']} ticks/sec")
        print(f"Performance: {metrics['performance_pct']}% of target (1000 ticks/sec)")
        
        if health['last_tick_time']:
            last_tick = datetime.fromisoformat(health['last_tick_time'])
            seconds_ago = (datetime.now() - last_tick).total_seconds()
            print(f"Last Tick: {seconds_ago:.1f}s ago")
        
        time.sleep(interval_seconds)
    
    print(f"\n{'='*60}")
    print("Monitoring complete")
    print('='*60)

def graceful_shutdown_example(service: MarketDataService):
    """
    Example of graceful shutdown with cleanup
    
    Args:
        service: MarketDataService instance
    """
    print("\n" + "="*60)
    print("Performing Graceful Shutdown")
    print("="*60)
    
    try:
        # Get final metrics
        print("\n1. Collecting final metrics...")
        metrics = service.get_metrics()
        print(f"   Total ticks received: {metrics['total_ticks_received']}")
        print(f"   Uptime: {metrics['uptime_seconds']} seconds")
        
        # Get storage status
        print("\n2. Checking storage...")
        storage = service.get_storage_status()
        print(f"   Files created: {storage['total_files']}")
        print(f"   Ticks stored today: {storage['total_ticks_today']}")
        
        # Unsubscribe all
        print("\n3. Unsubscribing all instruments...")
        service.unsubscribe_all()
        time.sleep(2)
        
        # Stop service
        print("\n4. Stopping service...")
        service.stop()
        
        print("\n✅ Graceful shutdown complete")
        
    except Exception as e:
        print(f"\n❌ Error during shutdown: {e}")

def create_monitoring_dashboard(service: MarketDataService):
    """
    Create a simple text-based monitoring dashboard
    
    Args:
        service: MarketDataService instance
    """
    os.system('clear' if os.name == 'posix' else 'cls')  # Clear screen
    
    print("╔" + "="*78 + "╗")
    print("║" + " "*20 + "MARKET DATA SERVICE DASHBOARD" + " "*29 + "║")
    print("╚" + "="*78 + "╝")
    
    # Health
    health = service.get_health()
    print(f"\n📊 SERVICE HEALTH")
    print(f"   Status: {health['status'].upper()}")
    print(f"   Uptime: {health['uptime_human']}")
    print(f"   Last Tick: {health['last_tick_time'] or 'N/A'}")
    
    # Connections
    connections = service.get_connections()
    print(f"\n🔌 CONNECTIONS")
    for conn_id in ['1', '2', '3']:
        conn = connections['connections'][conn_id]
        status_icon = "🟢" if conn['active'] else "🔴"
        print(f"   {status_icon} Connection {conn_id} ({conn['role']})")
        print(f"      Mode: {conn['mode']}, Instruments: {conn['subscribed_instruments']}, Ticks: {conn['tick_count']}")
    
    # Metrics
    metrics = service.get_metrics()
    print(f"\n📈 PERFORMANCE METRICS")
    print(f"   Total Ticks: {metrics['total_ticks_received']:,}")
    print(f"   Tick Rate: {metrics['average_tick_rate']} ticks/sec")
    print(f"   Target Achievement: {metrics['performance_pct']}%")
    
    # Storage
    storage = service.get_storage_status()
    print(f"\n💾 STORAGE")
    print(f"   Current File: {storage['current_file']}")
    print(f"   Ticks Today: {storage['total_ticks_today']:,}")
    
    print("\n" + "="*80)

# ============================================================================
# COMPLETE USAGE EXAMPLE
# ============================================================================

def complete_usage_example():
    """
    Complete end-to-end usage example
    
    This demonstrates the full lifecycle of the market data service
    """
    print("\n" + "#"*80)
    print("# COMPLETE USAGE EXAMPLE")
    print("#"*80)
    
    # Configuration
    API_KEY = "your_api_key"
    ACCESS_TOKEN = "your_access_token"
    DATA_DIR = "./market_data"
    
    # Instrument tokens (replace with your actual tokens)
    TRADING_UNIVERSE = SAMPLE_TOKENS
    
    try:
        # Step 1: Initialize service
        print("\n1️⃣  Initializing Market Data Service...")
        service = MarketDataService(API_KEY, ACCESS_TOKEN, DATA_DIR)
        
        # Step 2: Start service
        print("\n2️⃣  Starting service...")
        if not service.start():
            print("❌ Failed to start service")
            return
        
        print("✅ Service started")
        time.sleep(3)
        
        # Step 3: Subscribe to trading universe
        print(f"\n3️⃣  Subscribing to {len(TRADING_UNIVERSE)} instruments...")
        result = service.subscribe_trading_universe(TRADING_UNIVERSE, batch_size=100)
        print(f"✅ Subscribed: {result['successful_batches']}/{result['total_batches']} batches")
        
        # Step 4: Monitor for a period
        print("\n4️⃣  Monitoring tick reception (30 seconds)...")
        monitor_service_health(service, duration_seconds=30, interval_seconds=10)
        
        # Step 5: Activate full mode for specific instruments
        print("\n5️⃣  Activating FULL mode for select instruments...")
        full_mode_tokens = TRADING_UNIVERSE[:3]
        result = service.activate_full_mode(full_mode_tokens)
        if result['success']:
            print(f"✅ FULL mode activated for {len(full_mode_tokens)} instruments")
        
        # Step 6: Wait and collect more data
        print("\n6️⃣  Collecting data for 20 more seconds...")
        time.sleep(20)
        
        # Step 7: Display dashboard
        print("\n7️⃣  Service Dashboard:")
        create_monitoring_dashboard(service)
        
        # Step 8: Analyze stored data
        print("\n8️⃣  Analyzing stored data...")
        storage = service.get_storage_status()
        if storage['total_ticks_today'] > 0:
            current_file = service.storage_mgr.get_current_filepath()
            df = read_jsonl_to_dataframe(current_file, max_rows=1000)
            analysis = analyze_tick_data(df)
            
            print(f"\n   Analysis Results:")
            print(f"   - Total ticks analyzed: {analysis['total_ticks']}")
            print(f"   - Unique instruments: {analysis['unique_instruments']}")
            print(f"   - Modes: {analysis['modes']}")
            if 'price_stats' in analysis:
                print(f"   - Price range: ₹{analysis['price_stats']['min']:.2f} - ₹{analysis['price_stats']['max']:.2f}")
        
        # Step 9: Graceful shutdown
        print("\n9️⃣  Performing graceful shutdown...")
        graceful_shutdown_example(service)
        
        print("\n" + "#"*80)
        print("# EXAMPLE COMPLETE")
        print("#"*80)
        
    except KeyboardInterrupt:
        print("\n\n⚠️  Interrupted by user")
        if 'service' in locals():
            service.stop()
    except Exception as e:
        print(f"\n❌ Error: {e}")
        if 'service' in locals():
            service.stop()

# ============================================================================
# TROUBLESHOOTING HELPERS
# ============================================================================

def troubleshoot_connection(service: MarketDataService, connection_id: int):
    """
    Troubleshoot a specific connection
    
    Args:
        service: MarketDataService instance
        connection_id: Connection to troubleshoot (1, 2, or 3)
    """
    print(f"\n{'='*60}")
    print(f"Troubleshooting Connection {connection_id}")
    print('='*60)
    
    kws = service.conn_mgr.connections.get(connection_id)
    
    if kws is None:
        print(f"❌ Connection {connection_id} not initialized")
        print("   Solution: Call service.conn_mgr.start_connection({connection_id})")
        return
    
    print(f"✅ Connection object exists")
    
    # Check if connected
    is_connected = kws.is_connected()
    print(f"{'✅' if is_connected else '❌'} Connected: {is_connected}")
    
    if not is_connected:
        print("\n   Possible issues:")
        print("   1. Network connectivity problems")
        print("   2. Invalid API credentials")
        print("   3. Connection closed unexpectedly")
        print("\n   Solutions:")
        print("   - Check internet connection")
        print("   - Verify API_KEY and ACCESS_TOKEN")
        print("   - Connection will auto-reconnect (wait a moment)")
        print("   - Manual restart: service.conn_mgr.stop_connection({connection_id}), then start_connection({connection_id})")
    
    # Check subscription count
    with service.conn_mgr.lock:
        sub_count = len(service.conn_mgr.subscriptions[connection_id])
    
    print(f"\n📊 Subscriptions: {sub_count}")
    
    if sub_count == 0:
        print("   ℹ️  No instruments subscribed")
        print("   Solution: service.subscribe_instruments(tokens, connection_id={connection_id})")
    
    # Check tick reception
    tick_count = service.conn_mgr.tick_counters[connection_id]
    last_tick = service.conn_mgr.last_tick_time[connection_id]
    
    print(f"\n📈 Tick Count: {tick_count}")
    
    if last_tick:
        seconds_ago = (datetime.now() - last_tick).total_seconds()
        print(f"⏱️  Last Tick: {seconds_ago:.1f}s ago")
        
        if seconds_ago > 10 and is_connected and sub_count > 0:
            print("   ⚠️  No recent ticks despite being connected")
            print("   Possible issues:")
            print("   - Market is closed")
            print("   - Subscribed instruments not trading")
            print("   - Connection issue")
    else:
        print("⏱️  Last Tick: Never")
        if is_connected and sub_count > 0:
            print("   ℹ️  Connected with subscriptions but no ticks yet")
            print("   Solution: Wait a few seconds, or check if market is open")
    
    print('='*60)

def diagnose_tick_flow():
    """
    General diagnostics for tick data flow
    """
    print("\n" + "="*60)
    print("TICK FLOW DIAGNOSTICS")
    print("="*60)
    
    checklist = {
        "API Credentials": "Have you set valid API_KEY and ACCESS_TOKEN?",
        "Network": "Is your internet connection stable?",
        "Market Hours": "Is the market currently open? (9:15 AM - 3:30 PM IST)",
        "Subscriptions": "Have you subscribed to instruments?",
        "Connection": "Is at least one connection active?",
        "Valid Tokens": "Are you using valid instrument tokens?",
        "Rate Limits": "Have you exceeded API rate limits?",
    }
    
    for item, question in checklist.items():
        print(f"\n☐ {item}")
        print(f"   {question}")
    
    print("\n" + "="*60)
    print("Common Issues & Solutions:")
    print("="*60)
    
    issues = [
        {
            "issue": "No ticks received",
            "causes": [
                "Market is closed",
                "Invalid instrument tokens",
                "No subscriptions active",
                "Connection not established"
            ],
            "solutions": [
                "Check market hours (9:15 AM - 3:30 PM IST Mon-Fri)",
                "Verify instrument tokens are valid",
                "Call service.subscribe_instruments(tokens)",
                "Check service.get_health()['kite_connected']"
            ]
        },
        {
            "issue": "Connection keeps disconnecting",
            "causes": [
                "Network instability",
                "Invalid credentials",
                "Multiple connections with same API key"
            ],
            "solutions": [
                "Check internet connection stability",
                "Regenerate access token",
                "Ensure no other instances are running"
            ]
        },
        {
            "issue": "Duplicate subscription error",
            "causes": [
                "Instrument already subscribed on another connection"
            ],
            "solutions": [
                "Check service.get_instruments() for existing subscriptions",
                "Unsubscribe first: service.unsubscribe_instruments(tokens)"
            ]
        },
        {
            "issue": "Capacity exceeded",
            "causes": [
                "More than 3000 instruments on single connection"
            ],
            "solutions": [
                "Distribute across multiple connections",
                "Check capacity: service.conn_mgr.get_available_capacity(conn_id)"
            ]
        }
    ]
    
    for idx, item in enumerate(issues, 1):
        print(f"\n{idx}. {item['issue']}")
        print("   Causes:")
        for cause in item['causes']:
            print(f"     - {cause}")
        print("   Solutions:")
        for solution in item['solutions']:
            print(f"     - {solution}")
    
    print("\n" + "="*60)

def export_data_analysis(service: MarketDataService, output_file: str = "analysis_report.json"):
    """
    Export comprehensive analysis of collected data
    
    Args:
        service: MarketDataService instance
        output_file: Output filename for analysis report
    """
    print(f"\n{'='*60}")
    print("Exporting Data Analysis Report")
    print('='*60)
    
    try:
        # Get current file
        current_file = service.storage_mgr.get_current_filepath()
        
        if not os.path.exists(current_file):
            print("❌ No data file found")
            return
        
        # Read data
        print("📖 Reading tick data...")
        df = read_jsonl_to_dataframe(current_file)
        
        # Analyze
        print("📊 Analyzing...")
        analysis = analyze_tick_data(df)
        
        # Add service metrics
        metrics = service.get_metrics()
        health = service.get_health()
        connections = service.get_connections()
        
        report = {
            'generated_at': datetime.now().isoformat(),
            'data_analysis': analysis,
            'service_metrics': metrics,
            'health_status': health,
            'connection_status': connections,
            'file_analyzed': current_file,
            'total_rows': len(df)
        }
        
        # Save report
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        print(f"✅ Report saved to: {output_file}")
        
        # Print summary
        print(f"\n📋 Summary:")
        print(f"   Total Ticks: {len(df):,}")
        print(f"   Unique Instruments: {analysis['unique_instruments']}")
        print(f"   Time Range: {analysis['time_range']['start']} to {analysis['time_range']['end']}")
        print(f"   Avg Tick Rate: {metrics['average_tick_rate']} ticks/sec")
        
    except Exception as e:
        print(f"❌ Error exporting analysis: {e}")
    
    print('='*60)

# ============================================================================
# QUICK START TEMPLATE
# ============================================================================

def quick_start_template():
    """
    Quick start template - copy and modify this for your use case
    """
    template = '''
# ============================================================================
# QUICK START TEMPLATE - Market Data Service
# ============================================================================

from datetime import datetime
import time

# 1. CONFIGURATION
API_KEY = "your_api_key_here"
ACCESS_TOKEN = "your_access_token_here"
DATA_DIR = "./market_data"

# Your instrument tokens
INSTRUMENT_TOKENS = [
    408065,   # Replace with your instruments
    738561,
    408577,
    # Add more tokens...
]

# 2. INITIALIZE SERVICE
service = MarketDataService(API_KEY, ACCESS_TOKEN, DATA_DIR)

# 3. START SERVICE
if service.start():
    print("✅ Service started")
else:
    print("❌ Failed to start")
    exit(1)

# Wait for connection
time.sleep(3)

# 4. SUBSCRIBE TO INSTRUMENTS
result = service.subscribe_trading_universe(INSTRUMENT_TOKENS, batch_size=100)
print(f"Subscribed: {result['successful_batches']}/{result['total_batches']} batches")

# 5. MONITOR (Optional)
# monitor_service_health(service, duration_seconds=60, interval_seconds=10)

# 6. LET IT RUN
try:
    print("Service running... Press Ctrl+C to stop")
    while True:
        # Check health every 30 seconds
        health = service.get_health()
        print(f"Status: {health['status']}, Ticks: {service.get_metrics()['total_ticks_received']}")
        time.sleep(30)
        
except KeyboardInterrupt:
    print("\\nStopping service...")

# 7. GRACEFUL SHUTDOWN
service.stop()
print("✅ Service stopped")

# 8. ANALYZE DATA (Optional)
# current_file = service.storage_mgr.get_current_filepath()
# df = read_jsonl_to_dataframe(current_file)
# analysis = analyze_tick_data(df)
# print(analysis)
'''
    
    print(template)

# ============================================================================
# PRINT USAGE GUIDE
# ============================================================================

print("✅ Usage examples and utilities loaded")
print("\n" + "="*80)
print("AVAILABLE FUNCTIONS")
print("="*80)

print("\n📖 Data Analysis:")
print("   - read_jsonl_to_dataframe(filepath) - Read JSONL into DataFrame")
print("   - analyze_tick_data(df) - Analyze tick data statistics")
print("   - export_data_analysis(service) - Generate analysis report")

print("\n🔍 Monitoring:")
print("   - monitor_service_health(service, duration, interval) - Monitor in loop")
print("   - create_monitoring_dashboard(service) - Display text dashboard")

print("\n🛠️ Troubleshooting:")
print("   - troubleshoot_connection(service, conn_id) - Diagnose connection issues")
print("   - diagnose_tick_flow() - General tick flow diagnostics")

print("\n📝 Examples:")
print("   - complete_usage_example() - Full end-to-end example")
print("   - quick_start_template() - Print quick start code template")
print("   - graceful_shutdown_example(service) - Shutdown example")

print("\n" + "="*80)
print("READY TO USE!")
print("="*80)
print("\nTo get started:")
print("1. Set API_KEY and ACCESS_TOKEN in test functions")
print("2. Run: complete_usage_example()")
print("   OR")
print("   Create your own service instance and start experimenting")
print("\nFor troubleshooting, run: diagnose_tick_flow()")