In [2]:
# Cell 1: Imports and Setup
import asyncio
import logging
import os
from dotenv import load_dotenv
from src.core.polymarket_websocket_client import PolymarketWebSocketClient

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)

# Load environment variables
load_dotenv()
API_KEY = os.getenv("POLYMARKET_API_KEY")
API_SECRET = os.getenv("POLYMARKET_API_SECRET")
API_PASSPHRASE = os.getenv("POLYMARKET_API_PASSPHRASE")


In [3]:

# Cell 2: Message Handler
async def message_handler(message):
    """Handle incoming WebSocket messages"""
    if isinstance(message, list):
        for msg in message:
            event_type = msg.get("event_type")
            if event_type:
                logging.info(f"Received {event_type} message: {msg}")
            else:
                logging.info(f"Received message: {msg}")
    else:
        logging.info(f"Received message: {message}")

# Cell 3: WebSocket Test Function
async def test_websocket():
    """Test WebSocket connection"""
    # Initialize WebSocket client
    ws_client = PolymarketWebSocketClient(
        api_key=API_KEY,
        api_secret=API_SECRET,
        api_passphrase=API_PASSPHRASE,
        message_callback=message_handler,
        ws_url="wss://ws-subscriptions-clob.polymarket.com/ws/"
    )
    
    logging.info("Starting WebSocket client...")
    
    # Example market condition IDs (replace with real ones from your markets)
    test_markets = [
        "112744708863056390222838400251590316345033278599357903193546628977027180579530"
    ]
    
    # Start the client and subscribe to user channel with specific markets
    task = await ws_client.start(
        channel_type="user",
        markets=test_markets
    )
    
    try:
        # Run for 60 seconds to capture messages
        logging.info("Listening for messages for 60 seconds...")
        await asyncio.sleep(60)
    except Exception as e:
        logging.error(f"Error during WebSocket test: {e}")
    finally:
        # Clean up
        await ws_client.stop()
        logging.info("WebSocket test completed")

# Cell 4: Run the Test
# In Jupyter, we need to handle the event loop differently
def run_test():
    loop = asyncio.get_event_loop()
    if loop.is_running():
        # If running in Jupyter, use this approach
        task = asyncio.ensure_future(test_websocket())
        return task
    else:
        # If running outside Jupyter, use this
        asyncio.run(test_websocket())

# Execute the test
run_test()

<Task pending name='Task-4' coro=<test_websocket() running at /var/folders/qq/b2n1bnrj4dd7b2qz0_1_8gfm0000gn/T/ipykernel_11305/165134432.py:15>>

2025-04-01 18:43:52,799 - INFO - Starting WebSocket client...
2025-04-01 18:43:52,803 - INFO - Listening for messages for 60 seconds...
2025-04-01 18:43:52,808 - INFO - Trying to establish WebSocket connection
2025-04-01 18:43:53,024 - INFO - Connected to WebSocket on channel user
2025-04-01 18:43:53,029 - INFO - Subscribed to user channel
