# Experiment: Greeks Streaming through TastyTrade API

In [1]:
import pandas as pd
import numpy as np
import requests
import shelve
import pytz
import time
import schedule
import smtplib

from datetime import datetime, timedelta
from pandas_market_calendars import get_calendar
from dynaconf import Dynaconf
from typing import Literal
from pathlib import Path
from loguru import logger
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Set up a logging directory
log_dir = Path(r'C:\Users\marwi\PycharmProjects\selling-volatility\src\logs')
log_dir.mkdir(exist_ok=True)

# Create log file path with timestamp
log_file = log_dir / f"tastytrade_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

# Configure logger to write to both console and file
logger.add(log_file, rotation="1 day")

# Load settings and secrets
settings = Dynaconf(
    settings_files=['settings.json', '.secrets.json'],
)

EnvironmentType = Literal['sandbox', 'production']  # create a type alias

# ENVIRONMENT toggles between sandbox (testing) and production (live trading)
ENVIRONMENT: EnvironmentType = 'sandbox'
logger.info(f'Using environment: {ENVIRONMENT}')


def get_session_token(environment: EnvironmentType):
    """
    Get or generate a session token based on the environment.

    Args:
        environment (str): The environment type ('sandbox' or 'production').

    Returns:
        str: The session token if found or generated, None if the request fails.

    Examples:
        session_token = get_session_token('sandbox')
    """
    with shelve.open(str(Path(settings.SESSION_SHELF_DIR) / 'session_data')) as db:
        session_token = db.get('session_token')
        token_expiry = db.get('token_expiry')

        # Check if we have a valid token that hasn't expired
        if session_token and token_expiry and datetime.now() < token_expiry:
            logger.success('Found existing session token.', extra={'session_token': session_token})
            logger.info(f'Existing session token will expire at {token_expiry}.')
            return session_token

    # If we get here, we either don't have a token or it's expired
    logger.warning('Session token expired or invalid, generating new session token...')
    if environment == 'sandbox':
        url = f"{settings.TASTY_SANDBOX_BASE_URL}/sessions"
        logger.info(f'Using environment:{environment} with base url: {url}')
        payload = {
            "login": settings.TASTY_SANDBOX.USERNAME,
            "password": settings.TASTY_SANDBOX.PASSWORD
        }
    else:
        url = f"{settings.TASTY_PRODUCTION_BASE_URL}/sessions"
        logger.info(f'Using environment:{environment} with base url: {url}')
        payload = {
            "login": settings.TASTY_PRODUCTION.USERNAME,
            "password": settings.TASTY_PRODUCTION.PASSWORD
        }
    logger.debug('Generated payload.')
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, json=payload, headers=headers)
    logger.info(f'Posted request: {response}')

    if response.status_code == 201:
        logger.success(f'Response status code: {response.status_code}. Received session token.')
        data = response.json()
        new_session_token = data['data']['session-token']
        new_token_expiry = datetime.now() + timedelta(hours=24)
        logger.debug(f'Saved new session token expiring at: {new_token_expiry}.')

        # Open a new shelf connection to store the token
        with shelve.open(str(Path(settings.SESSION_SHELF_DIR) / 'session_data')) as db:
            db['session_token'] = new_session_token
            db['token_expiry'] = new_token_expiry
            logger.success('Stored new session token and token expiry.')

        return new_session_token
    else:
        logger.error(f'Session token request failed with response code: {response.status_code}.')
        logger.debug(f'{response.text}')
        return None

get_session_token(ENVIRONMENT)

[32m2024-12-19 17:35:39.295[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m38[0m - [1mUsing environment: sandbox[0m
[32m2024-12-19 17:35:41.069[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_session_token[0m:[36m68[0m - [1mUsing environment:sandbox with base url: https://api.cert.tastyworks.com/sessions[0m
[32m2024-12-19 17:35:41.073[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mget_session_token[0m:[36m80[0m - [34m[1mGenerated payload.[0m
[32m2024-12-19 17:35:41.726[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_session_token[0m:[36m83[0m - [1mPosted request: <Response [201]>[0m
[32m2024-12-19 17:35:41.728[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_session_token[0m:[36m86[0m - [32m[1mResponse status code: 201. Received session token.[0m
[32m2024-12-19 17:35:41.729[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mget_session_token[0m:[36m90[0m - [34m[1mSaved new session token expiring at: 2024-12-

'Fw-GJjaRWgGgwwhnIDUZoesRDMDHzaTi8Lrk_jaUcWqb2eDlfPyrBQ+C'

In [2]:
EnvironmentType = Literal['sandbox', 'production']  # create a type alias

# ENVIRONMENT toggles between sandbox (testing) and production (live trading)
ENVIRONMENT: EnvironmentType = 'sandbox'
logger.info(f'Using environment: {ENVIRONMENT}')

def get_quote_token(environment: EnvironmentType, session_token: str):
    """
    Get an API quote token for streaming market data through DXLink.
    
    This token identifies the customer to TastyTrade's quote provider (DXLink).
    Note: You must be a registered tastytrade customer (with an opened account) to access quote streaming.

    Args:
        environment (str): The environment type ('sandbox' or 'production').
        session_token (str): Valid session token for authentication.

    Returns:
        tuple[str, str]: A tuple of (quote_token, dxlink_url) if successful, (None, None) if failed.

    Examples:
        quote_token, dxlink_url = get_quote_token('sandbox', session_token)
    """
    with shelve.open(str(Path(settings.SESSION_SHELF_DIR) / 'session_data')) as db:
        quote_token = db.get('quote_token')
        dxlink_url = db.get('dxlink_url')
        quote_token_expiry = db.get('quote_token_expiry')

        # Check if we have a valid token that hasn't expired
        if quote_token and dxlink_url and quote_token_expiry and datetime.now() < quote_token_expiry:
            logger.success('Found existing quote token.', extra={'quote_token': quote_token})
            logger.info(f'Existing quote token will expire at {quote_token_expiry}.')
            return quote_token, dxlink_url

    # If we get here, we either don't have a token or it's expired
    logger.warning('Quote token expired or invalid, requesting new quote token...')
    if environment == 'sandbox':
        url = f"{settings.TASTY_SANDBOX_BASE_URL}/api-quote-tokens"
        logger.info(f'Using environment:{environment} with base url: {url}')
    else:
        url = f"{settings.TASTY_PRODUCTION_BASE_URL}/api-quote-tokens"
        logger.info(f'Using environment:{environment} with base url: {url}')

    headers = {
        "Authorization": session_token
    }
    
    logger.debug('Generated headers with session token.')
    response = requests.get(url, headers=headers)  # Using GET instead of POST
    logger.info(f'GET request: {response}')

    if response.status_code == 200:  # Success code for GET is 200, not 201
        logger.success(f'Response status code: {response.status_code}. Received quote token.')
        data = response.json()['data']
        new_quote_token = data['token']
        new_dxlink_url = data['dxlink-url']
        # Quote tokens are valid for 24 hours per documentation
        new_token_expiry = datetime.now() + timedelta(hours=24)
        logger.debug(f'Saved new quote token expiring at: {new_token_expiry}.')

        # Open a new shelf connection to store the token and dxlink url
        with shelve.open(str(Path(settings.SESSION_SHELF_DIR) / 'session_data')) as db:
            db['quote_token'] = new_quote_token
            db['dxlink_url'] = new_dxlink_url
            db['quote_token_expiry'] = new_token_expiry
            logger.success('Stored new quote token, dxlink url, and token expiry.')

        return new_quote_token, new_dxlink_url
    else:
        if response.status_code == 404:
            error_data = response.json().get('error', {})
            if error_data.get('code') == 'quote_streamer.customer_not_found_error':
                logger.error('Quote token request failed: You must be a registered tastytrade customer with an opened account to access quote streaming.')
        logger.error(f'Quote token request failed with response code: {response.status_code}.')
        logger.debug(f'{response.text}')
        return None, None

[32m2024-12-19 17:35:45.558[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m5[0m - [1mUsing environment: sandbox[0m


In [3]:
environment = 'sandbox'

session_token = get_session_token(environment)
if session_token:
    quote_token, dxlink_url = get_quote_token(environment, session_token)
    if quote_token and dxlink_url:
        # Use these for WebSocket connection to DXLink
        print(f"Ready to connect to {dxlink_url} with token {quote_token}")

[32m2024-12-19 17:35:50.193[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_session_token[0m:[36m60[0m - [32m[1mFound existing session token.[0m
[32m2024-12-19 17:35:50.196[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_session_token[0m:[36m61[0m - [1mExisting session token will expire at 2024-12-20 17:35:41.729062.[0m
[32m2024-12-19 17:35:50.210[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m39[0m - [1mUsing environment:sandbox with base url: https://api.cert.tastyworks.com/api-quote-tokens[0m
[32m2024-12-19 17:35:50.213[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m48[0m - [34m[1mGenerated headers with session token.[0m
[32m2024-12-19 17:35:50.719[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m50[0m - [1mGET request: <Response [200]>[0m
[32m2024-12-19 17:35:50.723[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m53[0m - [32m[1mRe

Ready to connect to wss://tasty-openapi-ws.dxfeed.com/realtime with token dGFzdHksYXBpLCwxNzM0NzE2MTUwLDE3MzQ2Mjk3NTAsVWZhOTRmMTRmLTkwOTctNGNmYi1hNDg5LWRlNmIwZTgxNDRiYg.WLYqtDYhvjwiAaNP3x8LQ3-1xE3tg8K_uhBiIYw2kP0


In [4]:
import websockets
import json
import asyncio
from datetime import datetime
from typing import Optional, Dict, List, Any, Callable
from dataclasses import dataclass

@dataclass
class DXLinkConfig:
    """Configuration for DXLink connection"""
    version: str = "0.1-DXF-JS/0.3.0"
    keepalive_timeout: int = 60
    accept_keepalive_timeout: int = 60
    feed_channel: int = 3  # Channel for market data feed
    auth_channel: int = 0  # Channel for auth and keepalive

class DXLinkClient:
    """Client for streaming market data from DXLink"""
    
    def __init__(self, dxlink_url: str, quote_token: str):
        self.url = dxlink_url
        self.token = quote_token
        self.websocket: Optional[websockets.WebSocketClientProtocol] = None
        self.config = DXLinkConfig()
        self._running = False
        self._subscribed_symbols: Dict[str, List[str]] = {}
        self._callbacks: Dict[str, Callable] = {}
        self._keepalive_task: Optional[asyncio.Task] = None
        self._authorized = False

    async def connect(self):
        """Establish connection and perform setup sequence"""
        try:
            logger.info(f"Connecting to {self.url}")
            self.websocket = await websockets.connect(self.url)
            self._running = True
            
            # 1. Send SETUP message and wait for response
            setup_msg = {
                "type": "SETUP",
                "channel": self.config.auth_channel,
                "version": self.config.version,
                "keepaliveTimeout": self.config.keepalive_timeout,
                "acceptKeepaliveTimeout": self.config.accept_keepalive_timeout
            }
            logger.info("Sending SETUP message")
            await self._send_message(setup_msg)
            
            setup_response = await self._receive_message()
            if setup_response.get("type") != "SETUP":
                raise Exception(f"Unexpected response to SETUP: {setup_response}")
            logger.info("SETUP completed successfully")
            
            # 2. Wait for AUTH_STATE and send authorization
            auth_state = await self._receive_message()
            if (auth_state.get("type") != "AUTH_STATE" or 
                auth_state.get("state") != "UNAUTHORIZED"):
                raise Exception(f"Unexpected auth state: {auth_state}")
            
            auth_msg = {
                "type": "AUTH",
                "channel": self.config.auth_channel,
                "token": self.token
            }
            logger.info("Sending AUTH message")
            await self._send_message(auth_msg)
            
            # Wait for authorization confirmation
            auth_response = await self._receive_message()
            if (auth_response.get("type") != "AUTH_STATE" or 
                auth_response.get("state") != "AUTHORIZED"):
                raise Exception(f"Authorization failed: {auth_response}")
            
            self._authorized = True
            logger.info("Authorization successful")
            
            # 3. Open channel for market data
            channel_msg = {
                "type": "CHANNEL_REQUEST",
                "channel": self.config.feed_channel,
                "service": "FEED",
                "parameters": {"contract": "AUTO"}
            }
            logger.info("Requesting channel")
            await self._send_message(channel_msg)
            
            # Wait for channel confirmation
            channel_response = await self._receive_message()
            if (channel_response.get("type") != "CHANNEL_OPENED" or 
                channel_response.get("channel") != self.config.feed_channel):
                raise Exception(f"Channel opening failed: {channel_response}")
            
            logger.info("Channel opened successfully")
            
            # 4. Configure feed setup for Greeks data
            feed_setup_msg = {
                "type": "FEED_SETUP",
                "channel": self.config.feed_channel,
                "acceptAggregationPeriod": 0.1,
                "acceptDataFormat": "COMPACT",
                "acceptEventFields": {
                    "Greeks": [
                        "eventType",
                        "eventSymbol",
                        "price",
                        "volatility",
                        "delta",
                        "gamma",
                        "theta",
                        "rho",
                        "vega"
                    ]
                }
            }
            logger.info("Setting up feed")
            await self._send_message(feed_setup_msg)
            
            # Wait for feed configuration confirmation
            feed_config = await self._receive_message()
            if feed_config.get("type") != "FEED_CONFIG":
                raise Exception(f"Feed setup failed: {feed_config}")
            
            logger.info("Feed setup completed successfully")
            
            # Start keepalive task
            self._keepalive_task = asyncio.create_task(self._keepalive_loop())
            
            # Start message handling loop
            asyncio.create_task(self._message_handler())
            
            logger.success("Successfully connected to DXLink and completed setup sequence")
            return True
            
        except Exception as e:
            logger.error(f"Failed to connect to DXLink: {e}")
            if self.websocket:
                await self.websocket.close()
            self._running = False
            self._authorized = False
            return False

    async def subscribe_greeks(self, symbol: str):
        """Subscribe to Greeks events for a symbol"""
        if not self._authorized:
            logger.error("Cannot subscribe: not authorized")
            return
            
        try:
            subscription_msg = {
                "type": "FEED_SUBSCRIPTION",
                "channel": self.config.feed_channel,
                "reset": True,  # Reset to ensure clean subscription
                "add": [{"type": "Greeks", "symbol": symbol}]
            }
            logger.info(f"Subscribing to Greeks for {symbol}")
            await self._send_message(subscription_msg)
            self._subscribed_symbols[symbol] = ["Greeks"]
            
        except Exception as e:
            logger.error(f"Failed to subscribe to Greeks for {symbol}: {e}")

    async def _keepalive_loop(self):
        """Send keepalive messages every 30 seconds"""
        while self._running and self._authorized:
            try:
                keepalive_msg = {
                    "type": "KEEPALIVE",
                    "channel": self.config.auth_channel
                }
                await self._send_message(keepalive_msg)
                await asyncio.sleep(30)
            except Exception as e:
                logger.error(f"Keepalive error: {e}")
                await asyncio.sleep(1)

    async def _message_handler(self):
        """Handle incoming messages"""
        self.greeks_manager = GreeksManager()
        
        while self._running and self.websocket:
            try:
                raw_message = await self.websocket.recv()
                print(f"Raw message received: {raw_message}")
                
                message = json.loads(raw_message)
                if message.get("type") == "FEED_DATA":
                    greeks = self.greeks_manager.update(message)
                    if greeks and "greeks" in self._callbacks:
                        await self._callbacks["greeks"](greeks)
                elif message.get("type") == "AUTH_STATE":
                    await self._handle_auth_state(message)
                    
            except Exception as e:
                logger.error(f"Error handling message: {e}")
                await asyncio.sleep(1)

    async def _handle_feed_data(self, message: Dict[str, Any]):
        """Process incoming feed data"""
        try:
            data = message.get("data", [])
            if data and data[0] == "Greeks":
                # Parse Greeks data according to the configured fields
                greeks_data = {
                    "eventType": data[1],
                    "eventSymbol": data[2],
                    "price": data[3],
                    "volatility": data[4],
                    "delta": data[5],
                    "gamma": data[6],
                    "theta": data[7],
                    "rho": data[8],
                    "vega": data[9]
                }
                
                # Call registered callback if exists
                if "greeks" in self._callbacks:
                    await self._callbacks["greeks"](greeks_data)
                    
        except Exception as e:
            logger.error(f"Error processing feed data: {e}")
    
    # Add these methods to the DXLinkClient class
    def get_price(self, symbol: str) -> Optional[float]:
        return self.greeks_manager.get_field(symbol, 'price')
    
    def get_vega(self, symbol: str) -> Optional[float]:
        return self.greeks_manager.get_field(symbol, 'vega')
    
    def get_theta(self, symbol: str) -> Optional[float]:
        return self.greeks_manager.get_field(symbol, 'theta')
    
    def get_rho(self, symbol: str) -> Optional[float]:
        return self.greeks_manager.get_field(symbol, 'rho')

    async def _handle_auth_state(self, message: Dict[str, Any]):
        """Handle authentication state changes"""
        state = message.get("state")
        logger.info(f"Auth state changed to: {state}")

    async def _send_message(self, message: Dict[str, Any]):
        """Send a message to the WebSocket"""
        if self.websocket:
            await self.websocket.send(json.dumps(message))

    async def _receive_message(self) -> Dict[str, Any]:
        """Receive and parse a message from the WebSocket"""
        if self.websocket:
            message = await self.websocket.recv()
            return json.loads(message)
        return {}

    def on_greeks(self, callback: Callable[[Dict[str, Any]], None]):
        """Register callback for Greeks events"""
        self._callbacks["greeks"] = callback

    async def close(self):
        """Close the WebSocket connection"""
        self._running = False
        if self._keepalive_task:
            self._keepalive_task.cancel()
        if self.websocket:
            await self.websocket.close()

In [5]:
from dataclasses import dataclass
from typing import Dict, Optional, Any
from datetime import datetime

@dataclass
class GreeksData:
    """Data structure for Greeks values"""
    symbol: str
    price: float
    volatility: float
    delta: float
    gamma: float
    theta: float
    rho: float
    vega: float
    timestamp: datetime

    @classmethod
    def from_feed_data(cls, data: list) -> 'GreeksData':
        """Create GreeksData from raw feed data array"""
        # data[1] contains the actual values array
        values = data[1]
        return cls(
            symbol=values[1],
            price=float(values[2]),
            volatility=float(values[3]),
            delta=float(values[4]),
            gamma=float(values[5]),
            theta=float(values[6]),
            rho=float(values[7]),
            vega=float(values[8]),
            timestamp=datetime.utcnow()
        )

class GreeksManager:
    """Manages latest Greeks data for multiple symbols"""
    def __init__(self):
        self._latest_data: Dict[str, GreeksData] = {}
    
    def update(self, message: dict) -> Optional[GreeksData]:
        """Update Greeks data from a feed message"""
        try:
            data = message.get('data', [])
            if not data or data[0] != 'Greeks':
                return None
            
            greeks = GreeksData.from_feed_data(data)
            self._latest_data[greeks.symbol] = greeks
            return greeks
        except Exception as e:
            logger.error(f"Error updating Greeks data: {e}")
            return None
    
    def get_field(self, symbol: str, field: str) -> Optional[float]:
        """Get specific field value for a symbol"""
        greeks = self._latest_data.get(symbol)
        if greeks:
            return getattr(greeks, field, None)
        return None

In [6]:
from datetime import datetime, timezone, timedelta
import requests
from typing import Optional, Tuple

async def handle_greeks(greeks_data: GreeksData):
    """Handle incoming Greeks data with field-specific access"""
    print("\nParsed data:")
    print(f"Symbol: {greeks_data.symbol}")
    print(f"Price: {greeks_data.price:.4f}")
    print(f"Individual Greeks:")
    print(f"  Delta: {greeks_data.delta:.4f}")
    print(f"  Gamma: {greeks_data.gamma:.4f}")
    print(f"  Theta: {greeks_data.theta:.4f}")
    print(f"  Vega: {greeks_data.vega:.4f}")
    print(f"  Rho: {greeks_data.rho:.4f}")
    print(f"  IV: {greeks_data.volatility:.4f}")
    print(f"Timestamp: {greeks_data.timestamp}")

def get_closest_45_dte_spy_streamer_symbol(environment: str, session_token: str) -> Optional[str]:
    """
    Fetch the streamer symbol for the SPY option chain closest to 45 DTE.
    
    Args:
        environment (str): 'sandbox' or 'production'
        session_token (str): Valid session token for authentication
        
    Returns:
        Optional[str]: The streamer symbol if found, None otherwise
    """
    try:
        # Set up the base URL based on environment
        base_url = (settings.TASTY_SANDBOX_BASE_URL if environment == 'sandbox' 
                   else settings.TASTY_PRODUCTION_BASE_URL)
        
        # Configure headers with session token
        headers = {"Authorization": session_token}
        
        # Get option chains for SPY
        url = f"{base_url}/option-chains/SPY"
        response = requests.get(url, headers=headers)
        
        if response.status_code != 200:
            logger.error(f"Failed to fetch option chains: {response.status_code}")
            logger.debug(response.text)
            return None
            
        data = response.json()['data']
        
        # Calculate target date (45 days from now)
        target_date = datetime.now(timezone.utc) + timedelta(days=45)
        
        # Find the expiration closest to 45 days
        closest_expiration = None
        min_diff = float('inf')
        
        for item in data['items']:
            # Make sure to parse the expiration date as UTC
            expiration_str = item['expiration-date']
            # Convert to UTC aware datetime
            expiration = datetime.strptime(expiration_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
            
            diff = abs((expiration - target_date).total_seconds())
            if diff < min_diff:
                min_diff = diff
                closest_expiration = item
        
        if not closest_expiration:
            logger.error("No valid expiration dates found")
            return None
            
        # Get the streamer symbol from the closest expiration
        streamer_symbol = closest_expiration.get('streamer-symbol')
        
        if not streamer_symbol:
            logger.error("No streamer symbol found in response")
            return None
            
        logger.success(f"Found streamer symbol: {streamer_symbol} for expiration {closest_expiration['expiration-date']}")
        return streamer_symbol
        
    except Exception as e:
        logger.error(f"Error fetching streamer symbol: {e}")
        return None

# Usage in Jupyter notebook
environment = 'sandbox'
session_token = get_session_token(environment)
if not session_token:
    print("Failed to get session token")
else:
    quote_token, dxlink_url = get_quote_token(environment, session_token)
    if not quote_token or not dxlink_url:
        print("Failed to get quote token")
    else:
        # Get the streamer symbol (synchronous part)
        streamer_symbol = get_closest_45_dte_spy_streamer_symbol(environment, session_token)
        if not streamer_symbol:
            print("Failed to get streamer symbol")
        else:
            print(f"Got streamer symbol: {streamer_symbol}")

            async def stream_greeks():
                client = DXLinkClient(dxlink_url, quote_token)
                if not await client.connect():
                    print("Failed to connect to DXLink")
                    return
                
                await client.subscribe_greeks(streamer_symbol)
                client.on_greeks(handle_greeks)
                
                try:
                    while True:
                        # Example of accessing individual fields
                        await asyncio.sleep(5)  # Check every 5 seconds
                        price = client.get_price(streamer_symbol)
                        vega = client.get_vega(streamer_symbol)
                        print(f"\nIndividual field access:")
                        print(f"Current price: {price:.4f}")
                        print(f"Current vega: {vega:.4f}")
                except KeyboardInterrupt:
                    await client.close()
            
            # Start the streaming
            future = asyncio.ensure_future(stream_greeks())

[32m2024-12-19 17:36:35.269[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_session_token[0m:[36m60[0m - [32m[1mFound existing session token.[0m
[32m2024-12-19 17:36:35.270[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_session_token[0m:[36m61[0m - [1mExisting session token will expire at 2024-12-20 17:35:41.729062.[0m
[32m2024-12-19 17:36:35.280[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m31[0m - [32m[1mFound existing quote token.[0m
[32m2024-12-19 17:36:35.282[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_quote_token[0m:[36m32[0m - [1mExisting quote token will expire at 2024-12-20 17:35:50.728869.[0m


[32m2024-12-19 17:36:37.130[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mget_closest_45_dte_spy_streamer_symbol[0m:[36m78[0m - [32m[1mFound streamer symbol: .SPY250131C340 for expiration 2025-01-31[0m


Got streamer symbol: .SPY250131C340


[32m2024-12-19 17:36:37.142[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m34[0m - [1mConnecting to wss://tasty-openapi-ws.dxfeed.com/realtime[0m
[32m2024-12-19 17:36:38.449[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m46[0m - [1mSending SETUP message[0m
[32m2024-12-19 17:36:38.554[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m52[0m - [1mSETUP completed successfully[0m
[32m2024-12-19 17:36:38.564[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m65[0m - [1mSending AUTH message[0m
[32m2024-12-19 17:36:38.669[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m75[0m - [1mAuthorization successful[0m
[32m2024-12-19 17:36:38.670[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m84[0m - [1mRequesting channel[0m
[32m2024-12-19 17:36:38.770[0m | [1mINFO    [0m | [36m__main__[0m:[36mconnect[0m:[36m93[0m - [1mChannel opened successfully[0m
[32m2024-12-19 17:36:

Raw message received: {"type":"FEED_CONFIG","channel":3,"dataFormat":"COMPACT","aggregationPeriod":0.1,"eventFields":{"Greeks":["eventType","eventSymbol","price","volatility","delta","gamma","theta","rho","vega"]}}
Raw message received: {"type":"FEED_DATA","channel":3,"data":["Greeks",["Greeks",".SPY250131C340",249.326817811741,0.81374179228541,0.982073608162892,2.68066571472274E-4,-0.0840584077183417,0.388111200594431,0.0892108203779977]]}

Parsed data:
Symbol: .SPY250131C340
Price: 249.3268
Individual Greeks:
  Delta: 0.9821
  Gamma: 0.0003
  Theta: -0.0841
  Vega: 0.0892
  Rho: 0.3881
  IV: 0.8137
Timestamp: 2024-12-19 17:36:40.003068


  timestamp=datetime.utcnow()



Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892
Raw message received: {"type":"KEEPALIVE","channel":0}

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892

Individual field access:
Current price: 249.3268
Current vega: 0.0892
