In [1]:
# Cell 1: Imports and Setup

import requests
import os
import time
import datetime as dt
from datetime import timezone, timedelta
import base64
import json
from pathlib import Path
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
import pandas as pd
from tqdm.notebook import tqdm 
from dotenv import load_dotenv # Make sure to load it if .env is not in current dir: load_dotenv('path/to/.env')
import logging
import re 

# --- Logging Setup ---
logger_name = f"kalshi_fetch_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}" # Added YYYYMMDD for uniqueness
logger = logging.getLogger(logger_name)
if not logger.handlers: # Avoid adding multiple handlers if re-running cell
    logger.setLevel(logging.INFO) 
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s.%(funcName)s - %(message)s')
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(ch)
else: # If handlers exist, just ensure level is set (e.g. if you change it)
    logger.setLevel(logging.INFO)


# --- Configuration ---
load_dotenv() # Loads .env file from the current directory or a specified path
KALSHI_API_KEY_ID = os.getenv("KALSHI_API_KEY_ID")
KALSHI_PRIVATE_KEY_PATH = os.getenv("KALSHI_PRIVATE_KEY_PATH")
KALSHI_BASE_URL = "" # Will be set based on IS_DEMO_MODE

IS_DEMO_MODE = os.getenv("KALSHI_DEMO_MODE", "false").lower() == "true"
if IS_DEMO_MODE:
    logger.info("KALSHI: Running in DEMO mode.")
    KALSHI_BASE_URL = "https://demo-api.kalshi.co" # Standard demo base URL
    # For demo, you might need different API keys if they are separate
    KALSHI_DEMO_API_KEY_ID = os.getenv("KALSHI_DEMO_API_KEY_ID")
    KALSHI_DEMO_PRIVATE_KEY_PATH = os.getenv("KALSHI_DEMO_PRIVATE_KEY_PATH")
    if KALSHI_DEMO_API_KEY_ID: KALSHI_API_KEY_ID = KALSHI_DEMO_API_KEY_ID
    if KALSHI_DEMO_PRIVATE_KEY_PATH: KALSHI_PRIVATE_KEY_PATH = KALSHI_DEMO_PRIVATE_KEY_PATH
else:
    logger.info("KALSHI: Running in PRODUCTION mode.")
    # The Kalshi API docs you provided use "https://api.elections.kalshi.com"
    # For general trading API v2, "https://api.kalshi.com" is often the base.
    # We will use the one from your setup/docs for now.
    KALSHI_BASE_URL = "https://api.elections.kalshi.com" 
    # If KALSHI_BASE_URL = "https://api.elections.kalshi.com" doesn't work for non-election
    # markets like Bitcoin, you might need to change it to "https://api.kalshi.com"
    # and the full path would be, e.g., "https://api.kalshi.com/trade-api/v2/events".

# --- Auth Functions ---
private_key_global = None # Initialize as global

def load_private_key(file_path: str) -> rsa.RSAPrivateKey | None:
    global private_key_global # Declare intention to modify global
    if not file_path: # Handle case where path might be None
        logger.error("Private key file path is not provided.")
        return None
    try:
        with open(file_path, "rb") as key_file:
            private_key_global = serialization.load_pem_private_key(key_file.read(), password=None)
        logger.info(f"Private key loaded successfully from {file_path}")
        return private_key_global
    except FileNotFoundError:
        logger.error(f"Private key file not found: {file_path}")
        private_key_global = None
        return None
    except Exception as e:
        logger.error(f"Error loading private key from {file_path}: {e}")
        private_key_global = None
        return None

def sign_pss_text(private_key: rsa.RSAPrivateKey, text: str) -> str | None:
    if not private_key:
        logger.error("Private key not available for signing.")
        return None
    message = text.encode('utf-8')
    try:
        signature = private_key.sign(
            message, padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.DIGEST_LENGTH),
            hashes.SHA256()
        )
        return base64.b64encode(signature).decode('utf-8')
    except Exception as e:
        logger.error(f"Error during signing: {e}")
        return None

def get_kalshi_auth_headers(method: str, path: str) -> dict | None:
    if not private_key_global:
        logger.error("Global private_key_global not loaded. Cannot create auth headers.")
        return None
    if not KALSHI_API_KEY_ID:
        logger.error("Global KALSHI_API_KEY_ID not set. Cannot create auth headers.")
        return None
        
    timestamp_ms_str = str(int(time.time() * 1000))
    # Ensure path starts with '/' for consistency if endpoint_path might vary
    if not path.startswith('/'):
        path = '/' + path
    message_to_sign = timestamp_ms_str + method.upper() + path
    signature = sign_pss_text(private_key_global, message_to_sign)
    if signature is None: return None
    
    return {
        'accept': 'application/json',
        'KALSHI-ACCESS-KEY': KALSHI_API_KEY_ID,
        'KALSHI-ACCESS-SIGNATURE': signature,
        'KALSHI-ACCESS-TIMESTAMP': timestamp_ms_str
    }

# --- Initialize private key ---
if not (KALSHI_API_KEY_ID and KALSHI_PRIVATE_KEY_PATH):
    logger.critical("CRITICAL: KALSHI_API_KEY_ID or KALSHI_PRIVATE_KEY_PATH not found in .env file or environment variables.")
else:
    # Check if path is absolute, if not, make it relative to a known location (e.g., script dir)
    # For simplicity, assuming KALSHI_PRIVATE_KEY_PATH is correctly set (e.g., absolute or relative to cwd)
    expanded_path = Path(KALSHI_PRIVATE_KEY_PATH).expanduser().resolve()
    if not expanded_path.exists():
        logger.critical(f"CRITICAL: Private key file does not exist at resolved path: {expanded_path}")
        # Attempt to load anyway, load_private_key will log the FileNotFoundError
    load_private_key(str(expanded_path))


if private_key_global:
    logger.info("Kalshi client setup complete. Private key loaded.")
else:
    logger.error("Kalshi client setup failed: Private key could not be loaded. API calls will fail.")

2025-05-18 19:48:10,279 - INFO - kalshi_fetch_20250518_194810.<module> - KALSHI: Running in PRODUCTION mode.
2025-05-18 19:48:10,335 - INFO - kalshi_fetch_20250518_194810.load_private_key - Private key loaded successfully from /Users/omarabul-hassan/Desktop/projects/kalshi/key.pem
2025-05-18 19:48:10,336 - INFO - kalshi_fetch_20250518_194810.<module> - Kalshi client setup complete. Private key loaded.


In [2]:
# Cell 2: Kalshi API Request Function

def kalshi_api_get_request(endpoint_path: str, params: dict = None) -> dict | None:
    """
    Makes an authenticated GET request to the Kalshi API.
    
    Args:
        endpoint_path (str): The API endpoint path (e.g., "/trade-api/v2/events").
                             It should start with a '/'.
        params (dict, optional): Query parameters for the request. Defaults to None.
        
    Returns:
        dict | None: The JSON response as a dictionary, or None if an error occurs.
    """
    if not private_key_global:
        logger.error("Private key is not loaded. Cannot make API request.")
        return None
    if not KALSHI_BASE_URL:
        logger.error("KALSHI_BASE_URL is not set. Cannot make API request.")
        return None

    # Ensure endpoint_path starts with a '/'
    if not endpoint_path.startswith('/'):
        endpoint_path = '/' + endpoint_path
        
    full_url = f"{KALSHI_BASE_URL}{endpoint_path}"
    
    # The path for signing should not include the base URL, just the endpoint path like "/trade-api/v2/events"
    auth_headers = get_kalshi_auth_headers("GET", endpoint_path)
    if not auth_headers:
        logger.error(f"Failed to generate authentication headers for path: {endpoint_path}")
        return None

    try:
        logger.info(f"Making GET request to: {full_url} with params: {params}")
        response = requests.get(full_url, headers=auth_headers, params=params, timeout=20) # Added timeout
        response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
        return response.json()
    except requests.exceptions.HTTPError as http_err:
        logger.error(f"HTTP error occurred: {http_err} - Status: {response.status_code} - Response: {response.text}")
    except requests.exceptions.ConnectionError as conn_err:
        logger.error(f"Connection error occurred: {conn_err} to {full_url}")
    except requests.exceptions.Timeout as timeout_err:
        logger.error(f"Timeout error occurred: {timeout_err} for {full_url}")
    except requests.exceptions.RequestException as req_err:
        logger.error(f"An error occurred during the request: {req_err} for {full_url}")
    except json.JSONDecodeError:
        logger.error(f"Failed to decode JSON response from {full_url}. Response text: {response.text[:500]}...") # Log snippet
    return None

In [None]:
# Cell 3: Fetch and Display Filtered Bitcoin Markets

# --- Configuration for this cell ---
# From the example event KXBTCD-25MAY1523, the series ticker is KXBTCD.
TARGET_SERIES_TICKER = "KXBTCD" 

# Regex to match event titles like "Bitcoin price today at 11pm EDT?"
# This is made more general to capture variations like "Bitcoin price on May 20 at..."
EVENT_TITLE_REGEX_PATTERN = r"Bitcoin price .*? at \d{1,2}(?:am|pm)? EDT\?"
EVENT_TITLE_REGEX = re.compile(EVENT_TITLE_REGEX_PATTERN, re.IGNORECASE)

logger.info(f"Attempting to fetch events for series: {TARGET_SERIES_TICKER} matching pattern: '{EVENT_TITLE_REGEX_PATTERN}'")

all_matching_events_markets = []
cursor = None
page_count = 0
max_pages_to_fetch = 10  # Safety break for pagination to avoid infinite loops during testing
fetched_event_count = 0

while page_count < max_pages_to_fetch:
    page_count += 1
    logger.info(f"Fetching page {page_count} of events for series '{TARGET_SERIES_TICKER}'...")
    
    params = {
        "series_ticker": TARGET_SERIES_TICKER,
        "status": "closed,settled",  # We want historical data for backtesting
        "limit": 50,                 # Number of events per page (max 200 for /events)
        "with_nested_markets": "true" # Include market data directly in the event response
    }
    if cursor:
        params["cursor"] = cursor

    # The endpoint for events is /trade-api/v2/events
    response_data = kalshi_api_get_request("/trade-api/v2/events", params)

    if not response_data:
        logger.error("Failed to fetch events or response was empty. Stopping pagination.")
        break

    events = response_data.get("events", [])
    if not events:
        logger.info("No more events found for this series and status on this page.")
        break
    
    fetched_event_count += len(events)

    for event in events:
        event_ticker = event.get("ticker")
        event_title = event.get("title")
        
        if event_title and EVENT_TITLE_REGEX.search(event_title):
            logger.info(f"MATCH: Event '{event_title}' (Ticker: {event_ticker})")
            markets_data = event.get("markets", [])
            if markets_data:
                event_market_info = {
                    "event_ticker": event_ticker,
                    "event_title": event_title,
                    "open_date": event.get("open_date"),
                    "close_date": event.get("close_date"),
                    "settlement_date": event.get("settlement_date"),
                    "markets": []
                }
                for market in markets_data:
                    market_details = {
                        "market_ticker": market.get("ticker"),
                        "subtitle": market.get("subtitle"), # e.g., "$60000 or more"
                        "status": market.get("status"),
                        "result": market.get("result"), # 'yes' or 'no' for binary markets after settlement
                        "yes_price_latest": market.get("yes_price"), 
                        "no_price_latest": market.get("no_price"),
                    }
                    event_market_info["markets"].append(market_details)
                all_matching_events_markets.append(event_market_info)
            else:
                logger.info(f"  - Event '{event_title}' (Ticker: {event_ticker}) matched title but had no nested markets in this response.")
        # else: # For debugging non-matches
        #     if event_title:
        #         logger.debug(f"NO MATCH: Event '{event_title}' (Ticker: {event_ticker})")
        #     else:
        #         logger.debug(f"NO MATCH: Event with no title (Ticker: {event_ticker})")

    cursor = response_data.get("cursor")
    if not cursor:
        logger.info("No more pages (cursor is null). End of event list.")
        break
    
    logger.info(f"Fetched {len(events)} events on this page. Moving to next page with cursor: {cursor[:10]}...")
    time.sleep(0.5) # Be respectful to the API, small delay between paginated requests

logger.info(f"Total events scanned from series '{TARGET_SERIES_TICKER}': {fetched_event_count} over {page_count} page(s).")

# --- Displaying the collected market information ---
if all_matching_events_markets:
    print(f"\n--- Found {len(all_matching_events_markets)} Bitcoin Hourly Events (Closed/Settled) Matching Pattern ---")
    for event_data in sorted(all_matching_events_markets, key=lambda x: x.get("close_date") or "", reverse=True): # Sort by close_date
        print(f"\nEvent: {event_data['event_title']} (Ticker: {event_data['event_ticker']})")
        print(f"  Closes: {event_data.get('close_date')}, Settles: {event_data.get('settlement_date')}")
        if event_data['markets']:
            for m_info in event_data['markets']:
                print(f"  Market Ticker: {m_info['market_ticker']}")
                print(f"    Subtitle: {m_info['subtitle']}")
                print(f"    Status: {m_info['status']}, Result: {m_info['result']}")
        else:
            print("  No markets listed for this event.")
else:
    logger.warning(f"No events matching the title regex '{EVENT_TITLE_REGEX.pattern}' were found for series '{TARGET_SERIES_TICKER}' with status 'closed,settled'.")
    logger.warning("Possible reasons: ")
    logger.warning("1. The series_ticker is incorrect or has no such events.")
    logger.warning("2. The EVENT_TITLE_REGEX is too restrictive or doesn't match Kalshi's naming convention for these events.")
    logger.warning("3. No 'closed' or 'settled' events of this type exist, or they are very old and require more pagination.")
    logger.warning("4. API connection or authentication issues (check logs from Cell 1 and Cell 2).")
    logger.warning(f"5. The base URL '{KALSHI_BASE_URL}' might not serve this type of market data (see notes in Cell 1).")
    print(f"\nNo events matching the title regex '{EVENT_TITLE_REGEX_PATTERN}' were found for series '{TARGET_SERIES_TICKER}'.")
    print("Please check the logs and configurations.")

# You can now work with `all_matching_events_markets` list of dictionaries.
# For example, to create a DataFrame of all market tickers:
# market_list_for_df = []
# for event_data in all_matching_events_markets:
#     for m_info in event_data['markets']:
#         market_list_for_df.append({
#             'event_ticker': event_data['event_ticker'],
#             'event_title': event_data['event_title'],
#             'market_ticker': m_info['market_ticker'],
#             'market_subtitle': m_info['subtitle'],
#             'market_status': m_info['status'],
#             'market_result': m_info['result'],
#             'event_close_date': event_data.get('close_date')
#         })
# if market_list_for_df:
#    markets_df = pd.DataFrame(market_list_for_df)
#    print("\n--- DataFrame of Markets ---")
#    print(markets_df.head())
#    print(f"Total markets found: {len(markets_df)}")

In [11]:
# Cell 4: Extract Market Tickers and Save to CSV

import pandas as pd
import datetime as dt

# --- Configuration for this cell ---
# Ensure `all_matching_events_markets` variable exists from the previous cell execution
if 'all_matching_events_markets' not in globals() or not all_matching_events_markets:
    logger.error("The 'all_matching_events_markets' list is not defined or is empty.")
    print("Please ensure Cell 3 has been run successfully and found markets before running this cell.")
else:
    market_tickers_list = []
    for event_data in all_matching_events_markets:
        if event_data.get('markets'): # Check if the 'markets' key exists and is not empty
            for market_info in event_data['markets']:
                if market_info.get('market_ticker'): # Check if 'market_ticker' key exists
                    market_tickers_list.append(market_info['market_ticker'])
    
    if market_tickers_list:
        # Create a DataFrame with a single column for the market tickers
        tickers_df = pd.DataFrame(market_tickers_list, columns=['market_ticker'])
        
        # Remove any duplicate tickers, if any (shouldn't be if data is clean)
        tickers_df.drop_duplicates(inplace=True)
        
        # Define the output CSV file name
        # Adding a timestamp to the filename to avoid overwriting and keep track of versions
        timestamp_str = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
        csv_filename = f"kalshi_btc_hourly_market_tickers_{TARGET_SERIES_TICKER}_{timestamp_str}.csv"
        
        try:
            tickers_df.to_csv(csv_filename, index=False)
            logger.info(f"Successfully saved {len(tickers_df)} unique market tickers to {csv_filename}")
            print(f"\nSuccessfully saved {len(tickers_df)} unique market tickers to: {csv_filename}")
            print("Sample of saved tickers:")
            print(tickers_df.head().to_string())
        except Exception as e:
            logger.error(f"Error saving market tickers to CSV: {e}")
            print(f"An error occurred while saving to CSV: {e}")
            
    else:
        logger.warning("No market tickers were extracted from 'all_matching_events_markets'. CSV not created.")
        print("No market tickers found to save.")

2025-05-15 19:36:47,312 - INFO - kalshi_fetch_20250515_192458.<module> - Successfully saved 36455 unique market tickers to kalshi_btc_hourly_market_tickers_KXBTCD_20250515_193647.csv



Successfully saved 36455 unique market tickers to: kalshi_btc_hourly_market_tickers_KXBTCD_20250515_193647.csv
Sample of saved tickers:
                 market_ticker
0  KXBTCD-25MAY1522-T112249.99
1  KXBTCD-25MAY1522-T111999.99
2  KXBTCD-25MAY1522-T111749.99
3  KXBTCD-25MAY1522-T111499.99
4  KXBTCD-25MAY1522-T111249.99


In [None]:
# Cell 5: Fetch Historical Candlestick Data for Markets (Save per Market, Limited N)

import pandas as pd
import time
import datetime as dt
from datetime import timezone # Explicit import for timezone.utc
import glob # To find the latest tickers CSV
import os # For creating directory
# Ensure tqdm is imported if not already: from tqdm.notebook import tqdm

# --- Configuration & Constants for this cell ---
if 'TARGET_SERIES_TICKER' not in globals():
    TARGET_SERIES_TICKER = "KXBTCD" 
    logger.info(f"TARGET_SERIES_TICKER was not in globals, set to default: {TARGET_SERIES_TICKER}")

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!! SET THE NUMBER OF MARKETS TO PROCESS HERE !!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
NUMBER_OF_MARKETS_TO_PROCESS = 1000 # <--- CHANGE THIS VALUE AS NEEDED
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

PERIOD_MINUTES = 1
MAX_PERIODS_PER_REQUEST = 4900 
API_DELAY_SECONDS = 0.75 

# Create a directory for individual market CSVs
DATA_OUTPUT_DIR_BASE = "market_candlestick_data"
# Add a suffix to the directory name if processing a limited number, for clarity
dir_suffix = f"_first_{NUMBER_OF_MARKETS_TO_PROCESS}_markets" if NUMBER_OF_MARKETS_TO_PROCESS > 0 else ""
INDIVIDUAL_MARKET_DATA_DIR = os.path.join(DATA_OUTPUT_DIR_BASE, f"{TARGET_SERIES_TICKER}_candlesticks_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}{dir_suffix}")
os.makedirs(INDIVIDUAL_MARKET_DATA_DIR, exist_ok=True)
logger.info(f"Individual market CSVs will be saved in: {INDIVIDUAL_MARKET_DATA_DIR}")

# --- Helper Function to parse ISO date strings to UTC Unix timestamp ---
def parse_iso_to_unix_timestamp(date_string: str | None) -> int | None:
    if not date_string:
        return None
    try:
        if date_string.endswith('Z'):
            dt_obj = dt.datetime.fromisoformat(date_string.replace('Z', '+00:00'))
        else: 
            dt_obj = dt.datetime.fromisoformat(date_string)
            if dt_obj.tzinfo is None: 
                 dt_obj = dt_obj.replace(tzinfo=timezone.utc)
        return int(dt_obj.timestamp())
    except Exception as e:
        logger.error(f"Error parsing date string '{date_string}': {e}")
        return None

# --- Helper Function to Fetch Market Details (Open/Close Dates) ---
def fetch_market_details(market_ticker: str) -> dict | None:
    logger.info(f"Fetching details for market: {market_ticker}")
    api_path = f"/trade-api/v2/markets/{market_ticker}"
    response_data = kalshi_api_get_request(api_path) 
    if response_data and "market" in response_data:
        market_data = response_data["market"]
        details = {
            "open_ts": parse_iso_to_unix_timestamp(market_data.get("open_time")), 
            "close_ts": parse_iso_to_unix_timestamp(market_data.get("close_time")),
            "expiration_ts": parse_iso_to_unix_timestamp(market_data.get("expiration_time")),
            "expected_expiration_ts": parse_iso_to_unix_timestamp(market_data.get("expected_expiration_time")),
            "status": market_data.get("status")
        }
        if not details["open_ts"] or not details["close_ts"]:
             logger.warning(f"Could not parse open_ts or close_ts for {market_ticker} from 'open_time'/'close_time'. API Response market_data: {market_data}")
             return None
        logger.info(f"Details for {market_ticker}: Open: {details['open_ts']} ({market_data.get('open_time')}), Close: {details['close_ts']} ({market_data.get('close_time')}), Status: {details['status']}")
        return details
    else:
        logger.error(f"Failed to fetch or parse details for market: {market_ticker}. Response: {response_data}")
        return None

# --- Main Candlestick Fetching Function (Adapted from Professor's Snippet) ---
fetch_candlesticks_for_market_first_candle_logged_this_run = False

def fetch_candlesticks_for_market(market_ticker: str, 
                                  series_ticker: str, 
                                  start_ts_s: int, 
                                  end_ts_s: int, 
                                  period_minutes: int) -> list: # Return list, not DataFrame
    global fetch_candlesticks_for_market_first_candle_logged_this_run
    all_candlesticks_for_this_market = []
    if not all([market_ticker, series_ticker, start_ts_s, end_ts_s, period_minutes]):
        logger.error(f"Missing one or more required parameters for fetching candlesticks for {market_ticker}.")
        return all_candlesticks_for_this_market

    current_start_ts = start_ts_s
    total_duration_seconds = end_ts_s - start_ts_s
    period_seconds = period_minutes * 60
    if period_seconds == 0: 
        logger.error("Period_seconds is zero, aborting candlestick fetch.")
        return []
    total_expected_intervals = (total_duration_seconds // period_seconds) + 1 if total_duration_seconds >= 0 else 0
    
    logger.info(f"Preparing to fetch candlesticks for {market_ticker} from {dt.datetime.fromtimestamp(start_ts_s, tz=timezone.utc)} to {dt.datetime.fromtimestamp(end_ts_s, tz=timezone.utc)}")

    with tqdm(total=total_expected_intervals, desc=f"Candles: {market_ticker[:30]}...", leave=False, unit="candle") as pbar: # Truncate ticker in desc
        while current_start_ts <= end_ts_s:
            chunk_max_duration_seconds = (MAX_PERIODS_PER_REQUEST - 1) * period_minutes * 60 
            chunk_end_ts = min(end_ts_s, current_start_ts + chunk_max_duration_seconds)
            api_path = f"/trade-api/v2/series/{series_ticker}/markets/{market_ticker}/candlesticks"
            params = {"start_ts": current_start_ts, "end_ts": chunk_end_ts, "period_interval": period_minutes}
            logger.debug(f"Requesting chunk for {market_ticker}: start_ts={current_start_ts}, end_ts={chunk_end_ts}")
            auth_headers = get_kalshi_auth_headers("GET", api_path) 
            if auth_headers is None:
                logger.error(f"Failed to get auth headers for {market_ticker}. Skipping chunk.")
                pbar.update(MAX_PERIODS_PER_REQUEST) 
                current_start_ts = chunk_end_ts + (period_minutes * 60) 
                time.sleep(API_DELAY_SECONDS)
                continue
            try:
                response = requests.get(f"{KALSHI_BASE_URL}{api_path}", headers=auth_headers, params=params, timeout=30)
                response.raise_for_status()
                api_response_data = response.json()
                candlesticks_from_api = api_response_data.get("candlesticks", [])
                if not fetch_candlesticks_for_market_first_candle_logged_this_run and candlesticks_from_api:
                    logger.info("Structure of the first candlestick received from API (this session):")
                    logger.info(json.dumps(candlesticks_from_api[0], indent=2))
                    fetch_candlesticks_for_market_first_candle_logged_this_run = True

                if candlesticks_from_api:
                    for candle_data in candlesticks_from_api:
                        ts = candle_data.get("end_period_ts")
                        if ts is None or ts < current_start_ts or ts > end_ts_s : 
                            continue 
                        trade_price_info = candle_data.get("price", {}) or {} 
                        yes_bid_info = candle_data.get("yes_bid", {}) or {}
                        yes_ask_info = candle_data.get("yes_ask", {}) or {}
                        all_candlesticks_for_this_market.append({
                            "market_ticker": market_ticker, 
                            "series_ticker": series_ticker,
                            "timestamp_s": ts,
                            "datetime_utc": dt.datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(),
                            "trade_open_cents": trade_price_info.get("open"), "trade_high_cents": trade_price_info.get("high"),
                            "trade_low_cents": trade_price_info.get("low"), "trade_close_cents": trade_price_info.get("close"),
                            "yes_bid_open_cents": yes_bid_info.get("open"), "yes_bid_high_cents": yes_bid_info.get("high"),
                            "yes_bid_low_cents": yes_bid_info.get("low"), "yes_bid_close_cents": yes_bid_info.get("close"),
                            "yes_ask_open_cents": yes_ask_info.get("open"), "yes_ask_high_cents": yes_ask_info.get("high"),
                            "yes_ask_low_cents": yes_ask_info.get("low"), "yes_ask_close_cents": yes_ask_info.get("close"),
                            "volume": candle_data.get("volume"), "open_interest": candle_data.get("open_interest")
                        })
                    pbar.update(len(candlesticks_from_api) if candlesticks_from_api else (chunk_end_ts - current_start_ts) // period_seconds)
                else:
                    pbar.update(max(1, (chunk_end_ts - current_start_ts + 1) // period_seconds))

                if candlesticks_from_api:
                    last_ts_in_chunk = candlesticks_from_api[-1].get("end_period_ts", chunk_end_ts)
                    current_start_ts = last_ts_in_chunk + (period_minutes * 60)
                else:
                    current_start_ts = chunk_end_ts + (period_minutes * 60)
            except requests.exceptions.HTTPError as http_err:
                logger.error(f"HTTP error for {market_ticker} chunk: {http_err} - Status: {response.status_code} - Response: {response.text[:200]}")
                pbar.update(MAX_PERIODS_PER_REQUEST) 
                current_start_ts = chunk_end_ts + (period_minutes * 60) 
            except Exception as e:
                logger.error(f"Generic error for {market_ticker} chunk: {e}")
                pbar.update(MAX_PERIODS_PER_REQUEST) 
                current_start_ts = chunk_end_ts + (period_minutes * 60)
            
            time.sleep(API_DELAY_SECONDS) 
            if current_start_ts > chunk_end_ts and chunk_end_ts == end_ts_s:
                 break
    logger.info(f"Fetched {len(all_candlesticks_for_this_market)} candlesticks for {market_ticker}.")
    return all_candlesticks_for_this_market

# --- Main Execution Logic ---
all_tickers_from_csv = []
# Find the latest CSV file with market tickers
try:
    list_of_files = glob.glob(f"kalshi_btc_hourly_market_tickers_{TARGET_SERIES_TICKER}_*.csv") 
    if not list_of_files:
        raise FileNotFoundError(f"No market ticker CSV files found matching pattern: kalshi_btc_hourly_market_tickers_{TARGET_SERIES_TICKER}_*.csv")
    latest_tickers_csv = max(list_of_files, key=os.path.getctime)
    logger.info(f"Reading market tickers from: {latest_tickers_csv}")
    market_tickers_df = pd.read_csv(latest_tickers_csv)
    all_tickers_from_csv = market_tickers_df['market_ticker'].unique().tolist()
    logger.info(f"Found {len(all_tickers_from_csv)} unique market tickers in CSV.")
except FileNotFoundError as fnf_err:
    logger.error(f"CRITICAL: {fnf_err}")
except Exception as e:
    logger.error(f"CRITICAL: Error reading or processing the tickers CSV: {e}")

# Determine the actual list of tickers to process based on the limit
if not all_tickers_from_csv:
    tickers_to_fetch = []
    logger.warning("No tickers loaded from CSV. Nothing to process.")
elif NUMBER_OF_MARKETS_TO_PROCESS > 0 and NUMBER_OF_MARKETS_TO_PROCESS < len(all_tickers_from_csv):
    tickers_to_fetch = all_tickers_from_csv[:NUMBER_OF_MARKETS_TO_PROCESS]
    logger.info(f"Processing the first {NUMBER_OF_MARKETS_TO_PROCESS} markets out of {len(all_tickers_from_csv)} available.")
else:
    tickers_to_fetch = all_tickers_from_csv # Process all if limit is 0 or >= total
    logger.info(f"Processing all {len(all_tickers_from_csv)} available markets (limit not restrictive).")


processed_markets_count = 0
failed_markets_count = 0
total_markets_to_attempt = len(tickers_to_fetch)

# --- Outer loop for progress across all tickers ---
# Update tqdm total based on the actual number of markets we will process
with tqdm(total=total_markets_to_attempt, desc="Total Markets Progress", unit="market") as market_pbar:
    for market_ticker in tickers_to_fetch:
        market_pbar.set_postfix_str(f"{market_ticker[:25]}...") 
        
        market_csv_filename = os.path.join(INDIVIDUAL_MARKET_DATA_DIR, f"{market_ticker}.csv")
        if os.path.exists(market_csv_filename):
            logger.info(f"Data for {market_ticker} already exists at {market_csv_filename}. Skipping.")
            market_pbar.update(1)
            processed_markets_count +=1 
            continue
        
        details = fetch_market_details(market_ticker)
        time.sleep(API_DELAY_SECONDS) 

        if details and details["open_ts"] and details["close_ts"]:
            if details["open_ts"] > details["close_ts"]:
                logger.warning(f"Market {market_ticker} has open_ts ({details['open_ts']}) after close_ts ({details['close_ts']}). Skipping.")
                failed_markets_count += 1
                market_pbar.update(1)
                continue
            if details["status"] not in ["closed", "settled", "finalized"]:
                 logger.warning(f"Market {market_ticker} status is '{details['status']}', not 'closed' or 'settled'. Candlestick data might be partial or unavailable. Proceeding with caution.")
            
            candlesticks_list = fetch_candlesticks_for_market(
                market_ticker=market_ticker,
                series_ticker=TARGET_SERIES_TICKER,
                start_ts_s=details["open_ts"],
                end_ts_s=details["close_ts"], 
                period_minutes=PERIOD_MINUTES
            )
            if candlesticks_list:
                market_df = pd.DataFrame(candlesticks_list)
                market_df.sort_values(by=['timestamp_s'], inplace=True)
                try:
                    market_df.to_csv(market_csv_filename, index=False)
                    logger.info(f"Successfully saved {len(market_df)} candlesticks for {market_ticker} to {market_csv_filename}")
                    processed_markets_count += 1
                except Exception as e:
                    logger.error(f"Error saving data for {market_ticker} to CSV {market_csv_filename}: {e}")
                    failed_markets_count += 1
            else:
                logger.info(f"No candlestick data returned or processed for {market_ticker}. No CSV created for this market.")
        else:
            logger.warning(f"Could not get valid open/close times for {market_ticker}. Skipping candlestick fetch and CSV save.")
            failed_markets_count += 1
        
        market_pbar.update(1) 

logger.info("--- Candlestick Data Fetching Complete ---")
logger.info(f"Total markets attempted in this run: {total_markets_to_attempt}")
logger.info(f"Markets successfully processed and CSV saved (or existed): {processed_markets_count}")
logger.info(f"Markets failed or skipped: {failed_markets_count}")
logger.info(f"Individual market CSVs are located in: {INDIVIDUAL_MARKET_DATA_DIR}")
print(f"\n--- Data Fetching Complete ---")
print(f"Data for individual markets saved in directory: {INDIVIDUAL_MARKET_DATA_DIR}")
print(f"Successfully processed: {processed_markets_count} markets.")
print(f"Failed/Skipped: {failed_markets_count} markets.")

In [24]:
# In a new notebook cell, after running Cells 1 & 2
# Make sure KALSHI_BASE_URL and private_key_global are set

problem_ticker = "KXBTCD-25MAY1509-T100249.99" 
details = fetch_market_details(problem_ticker) # fetch_market_details is from Cell 5 of notebook
if details:
    open_dt = dt.datetime.fromtimestamp(details["open_ts"], tz=timezone.utc)
    close_dt = dt.datetime.fromtimestamp(details["close_ts"], tz=timezone.utc)
    print(f"Ticker: {problem_ticker}")
    print(f"  Raw API open_time used: {details.get('raw_open_time_from_api_if_logged')}") # If you logged this in Cell 5
    print(f"  Parsed Open TS: {details['open_ts']} -> UTC: {open_dt.isoformat()}")
    print(f"  Parsed Close TS: {details['close_ts']} -> UTC: {close_dt.isoformat()}")
else:
    print(f"Could not fetch details for {problem_ticker}")

2025-05-18 14:36:29,648 - INFO - kalshi_fetch_20250515_192458.fetch_market_details - Fetching details for market: KXBTCD-25MAY1509-T100249.99
2025-05-18 14:36:29,658 - INFO - kalshi_fetch_20250515_192458.kalshi_api_get_request - Making GET request to: https://api.elections.kalshi.com/trade-api/v2/markets/KXBTCD-25MAY1509-T100249.99 with params: None
2025-05-18 14:36:29,889 - INFO - kalshi_fetch_20250515_192458.fetch_market_details - Details for KXBTCD-25MAY1509-T100249.99: Open: 1747310400 (2025-05-15T12:00:00Z), Close: 1747314000 (2025-05-15T13:00:00Z), Status: finalized


Ticker: KXBTCD-25MAY1509-T100249.99
  Raw API open_time used: None
  Parsed Open TS: 1747310400 -> UTC: 2025-05-15T12:00:00+00:00
  Parsed Close TS: 1747314000 -> UTC: 2025-05-15T13:00:00+00:00


In [20]:
# Cell 6: Organize Market CSVs into Date Directories

import os
import glob
import shutil # For moving files
import re
import datetime as dt
from pathlib import Path # Useful for path manipulation
# Ensure tqdm is imported if not already: from tqdm.notebook import tqdm

# --- Configuration ---
DATA_OUTPUT_DIR_BASE = "market_candlestick_data" # Base directory used in Cell 5
TARGET_SERIES_TICKER = "KXBTCD" # Ensure this is consistent

# --- Find the latest data directory ---
# This will find the directory like 'market_candlestick_data/KXBTCD_candlesticks_YYYYMMDD_HHMMSS_suffix'
data_directories = glob.glob(os.path.join(DATA_OUTPUT_DIR_BASE, f"{TARGET_SERIES_TICKER}_candlesticks_*"))
if not data_directories:
    logger.critical(f"CRITICAL: No data directories found in {DATA_OUTPUT_DIR_BASE} matching pattern {TARGET_SERIES_TICKER}_candlesticks_*")
    print(f"\nCRITICAL: Could not find a data directory to organize. Please run Cell 5 first.")
else:
    # Get the most recently created directory
    latest_data_dir = max(data_directories, key=os.path.getctime)
    logger.info(f"Found latest data directory: {latest_data_dir}")
    print(f"\nOrganizing data from: {latest_data_dir}")

    # --- Function to extract date component from ticker (re-using logic from Cell 5) ---
    # Note: This function assumes the ticker format includes YYMMMDDHH
    def get_date_from_ticker_filename(filename: str) -> str | None:
        # Expects filenames like KXBTCD-25MAY1520-T94249.99.csv
        # Extracts the part like "25MAY15"
        # Remove the .csv extension first
        base_name = filename.replace('.csv', '')
        # Use regex to find the YYMMMDD part between the first '-' and the next digit block (the hour)
        match = re.search(r'-(\d{2}[A-Z]{3}\d{2})\d{2}-', base_name) 
        if match:
            return match.group(1)
        # Fallback for slightly different patterns if any (less likely for these tickers)
        match = re.search(r'-(\d{2}[A-Z]{3}\d{2})', base_name) # Look for YYMMMDD at the end if no hour/strike is parsed
        if match:
             logger.warning(f"Used fallback date extraction for {filename}")
             return match.group(1)

        logger.warning(f"Could not extract date component from filename: {filename}")
        return None

    # --- Get all CSV files in the latest data directory ---
    all_csv_files = list(Path(latest_data_dir).glob("*.csv"))
    if not all_csv_files:
        logger.warning(f"No CSV files found in {latest_data_dir}. Nothing to organize.")
        print(f"No CSV files found in {latest_data_dir}. Nothing to organize.")
    else:
        logger.info(f"Found {len(all_csv_files)} CSV files to organize.")
        
        # --- Iterate and move files ---
        moved_count = 0
        failed_count = 0

        with tqdm(total=len(all_csv_files), desc="Organizing Files by Date", unit="file") as pbar:
            for file_path in all_csv_files:
                file_name = file_path.name # Get just the filename (e.g., "KXBTCD-25MAY1520-T94249.99.csv")
                
                date_component_str = get_date_from_ticker_filename(file_name)
                
                if date_component_str:
                    try:
                        # Parse the YYMMMDD string into a date object (e.g., '25MAY15' -> datetime(2025, 5, 15))
                        date_obj = dt.datetime.strptime(date_component_str, "%y%b%d").date()
                        # Format the date as YYYY-MM-DD for the subdirectory name
                        date_subdir_name = date_obj.strftime("%Y-%m-%d")

                        # Create the full path for the date subdirectory
                        date_subdir_path = os.path.join(latest_data_dir, date_subdir_name)
                        os.makedirs(date_subdir_path, exist_ok=True) # Create the subdirectory if it doesn't exist

                        # Define source and destination paths
                        source_path = file_path
                        destination_path = os.path.join(date_subdir_path, file_name)

                        # Move the file
                        shutil.move(source_path, destination_path)
                        # logger.debug(f"Moved {file_name} to {date_subdir_path}")
                        moved_count += 1
                    except ValueError:
                        logger.error(f"Could not parse date string '{date_component_str}' from filename {file_name}. Skipping.")
                        failed_count += 1
                    except Exception as e:
                        logger.error(f"Error organizing file {file_name}: {e}. Skipping.")
                        failed_count += 1
                else:
                    # get_date_from_ticker_filename already logged a warning
                    failed_count += 1

                pbar.update(1)

        logger.info("--- File Organization Complete ---")
        logger.info(f"Total files processed: {len(all_csv_files)}")
        logger.info(f"Files successfully moved: {moved_count}")
        logger.info(f"Files failed or skipped: {failed_count}")
        print(f"\n--- File Organization Complete ---")
        print(f"Successfully moved {moved_count} files into date subdirectories.")
        print(f"Failed to organize {failed_count} files.")
        print(f"Check the directory {latest_data_dir} for new date folders.")

2025-05-16 14:50:05,534 - INFO - kalshi_fetch_20250515_192458.<module> - Found latest data directory: market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets
2025-05-16 14:50:05,538 - INFO - kalshi_fetch_20250515_192458.<module> - Found 999 CSV files to organize.



Organizing data from: market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets


Organizing Files by Date:   0%|          | 0/999 [00:00<?, ?file/s]

2025-05-16 14:50:05,658 - INFO - kalshi_fetch_20250515_192458.<module> - --- File Organization Complete ---
2025-05-16 14:50:05,659 - INFO - kalshi_fetch_20250515_192458.<module> - Total files processed: 999
2025-05-16 14:50:05,659 - INFO - kalshi_fetch_20250515_192458.<module> - Files successfully moved: 999
2025-05-16 14:50:05,659 - INFO - kalshi_fetch_20250515_192458.<module> - Files failed or skipped: 0



--- File Organization Complete ---
Successfully moved 999 files into date subdirectories.
Failed to organize 0 files.
Check the directory market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets for new date folders.


In [22]:
# Cell 6: Organize Market CSVs into Date/Hour Folders

import os
import re
import shutil # For moving files
import glob
from tqdm.notebook import tqdm

# --- Configuration for this cell ---

# This should be the directory where Cell 5 saved the individual market CSVs.
# Let's try to get it dynamically if `INDIVIDUAL_MARKET_DATA_DIR` is still in scope.
# Otherwise, you might need to set it manually.
if 'INDIVIDUAL_MARKET_DATA_DIR' in globals() and os.path.exists(INDIVIDUAL_MARKET_DATA_DIR):
    SOURCE_DATA_DIR = INDIVIDUAL_MARKET_DATA_DIR
    logger.info(f"Using source directory from previous cell: {SOURCE_DATA_DIR}")
else:
    # !! MANUALLY SET THIS IF THE ABOVE IS NOT FOUND !!
    # Example: SOURCE_DATA_DIR = "/Users/omarabul-hassan/Desktop/projects/kalshi/notebooks/market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets"
    # Try to find the latest one if not in scope
    try:
        list_of_data_dirs = glob.glob(os.path.join("market_candlestick_data", f"{TARGET_SERIES_TICKER}_candlesticks_*"))
        if not list_of_data_dirs:
            raise FileNotFoundError("No candlestick data directories found.")
        SOURCE_DATA_DIR = max(list_of_data_dirs, key=os.path.getctime)
        logger.info(f"Dynamically found latest source directory: {SOURCE_DATA_DIR}")
    except Exception as e:
        logger.error(f"Could not find INDIVIDUAL_MARKET_DATA_DIR automatically. Please set SOURCE_DATA_DIR manually. Error: {e}")
        SOURCE_DATA_DIR = None # Needs to be set

# Base directory for the new organized structure
BASE_ORGANIZED_DIR = "organized_market_data" 
os.makedirs(BASE_ORGANIZED_DIR, exist_ok=True)
logger.info(f"Organized data will be placed under: {BASE_ORGANIZED_DIR}")

# Regex to parse the market ticker filename
# Example: KXBTCD-25MAY1522-T104499.99.csv
# We need to capture:
# 1. The full date part (e.g., 25MAY15)
# 2. The hour part (e.g., 22)
TICKER_FILENAME_REGEX = re.compile(r"^(KXBTCD)-(\d{2}[A-Z]{3}\d{2})(\d{2})-(T\d+\.\d{2})\.csv$")
# Group 1: Series (KXBTCD)
# Group 2: Date (e.g., 25MAY15)
# Group 3: Hour (e.g., 22)
# Group 4: Strike (e.g., T104499.99)

moved_files_count = 0
skipped_files_count = 0
failed_to_parse_count = 0

if SOURCE_DATA_DIR and os.path.isdir(SOURCE_DATA_DIR):
    csv_files = [f for f in os.listdir(SOURCE_DATA_DIR) if f.endswith('.csv')]
    logger.info(f"Found {len(csv_files)} CSV files in {SOURCE_DATA_DIR} to organize.")

    for filename in tqdm(csv_files, desc="Organizing Files"):
        match = TICKER_FILENAME_REGEX.match(filename)
        if match:
            series_ticker_part = match.group(1) # Should be KXBTCD
            date_part = match.group(2)          # e.g., 25MAY15
            hour_part = match.group(3)          # e.g., 22
            # strike_part = match.group(4)      # e.g., T104499.99 (not used for dir structure but good to have)

            # Create the target directory structure: BASE_ORGANIZED_DIR / DATE / HOUR
            target_date_dir = os.path.join(BASE_ORGANIZED_DIR, date_part)
            target_hour_dir = os.path.join(target_date_dir, hour_part)
            
            os.makedirs(target_hour_dir, exist_ok=True) # Create dirs if they don't exist

            source_filepath = os.path.join(SOURCE_DATA_DIR, filename)
            target_filepath = os.path.join(target_hour_dir, filename)

            try:
                if not os.path.exists(target_filepath): # Avoid error if re-running and file already moved
                    shutil.move(source_filepath, target_filepath)
                    # logger.debug(f"Moved: {source_filepath} -> {target_filepath}")
                    moved_files_count += 1
                else:
                    logger.warning(f"File already exists at target, skipping move: {target_filepath}")
                    # If you want to count these as "skipped because exists" rather than an error:
                    skipped_files_count += 1 
                    # Optionally, you could delete the source if it's a duplicate from a re-run attempt
                    # os.remove(source_filepath) 
                    # logger.info(f"Removed duplicate source: {source_filepath}")

            except Exception as e:
                logger.error(f"Error moving file {filename}: {e}")
                skipped_files_count += 1
        else:
            logger.warning(f"Filename {filename} did not match expected pattern. Skipping.")
            failed_to_parse_count += 1
            
    logger.info("--- File Organization Complete ---")
    logger.info(f"Files successfully moved: {moved_files_count}")
    logger.info(f"Files skipped (e.g., already existed at target or error): {skipped_files_count}")
    logger.info(f"Files whose names failed to parse: {failed_to_parse_count}")
    print(f"\n--- File Organization Complete ---")
    print(f"Moved {moved_files_count} files into the new structure under '{BASE_ORGANIZED_DIR}'.")
    if skipped_files_count > 0:
        print(f"Skipped {skipped_files_count} files (e.g., target existed or move error).")
    if failed_to_parse_count > 0:
        print(f"Could not parse filename for {failed_to_parse_count} files.")

else:
    logger.error(f"SOURCE_DATA_DIR '{SOURCE_DATA_DIR}' is not set or is not a valid directory. Please check the path.")
    print(f"Error: SOURCE_DATA_DIR '{SOURCE_DATA_DIR}' is not valid. Please set it correctly.")

2025-05-16 14:55:13,882 - INFO - kalshi_fetch_20250515_192458.<module> - Using source directory from previous cell: market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets
2025-05-16 14:55:13,883 - INFO - kalshi_fetch_20250515_192458.<module> - Organized data will be placed under: organized_market_data
2025-05-16 14:55:13,884 - INFO - kalshi_fetch_20250515_192458.<module> - Found 999 CSV files in market_candlestick_data/KXBTCD_candlesticks_20250515_205224_first_1000_markets to organize.


Organizing Files:   0%|          | 0/999 [00:00<?, ?it/s]

2025-05-16 14:55:13,977 - INFO - kalshi_fetch_20250515_192458.<module> - --- File Organization Complete ---
2025-05-16 14:55:13,978 - INFO - kalshi_fetch_20250515_192458.<module> - Files successfully moved: 999
2025-05-16 14:55:13,978 - INFO - kalshi_fetch_20250515_192458.<module> - Files skipped (e.g., already existed at target or error): 0
2025-05-16 14:55:13,978 - INFO - kalshi_fetch_20250515_192458.<module> - Files whose names failed to parse: 0



--- File Organization Complete ---
Moved 999 files into the new structure under 'organized_market_data'.


In [23]:
# In your Jupyter Notebook, after Cell 3 has run
market_outcomes_data = []
if 'all_matching_events_markets' in globals() and all_matching_events_markets:
    for event_data in all_matching_events_markets:
        event_close_date_iso = event_data.get('close_date') # This is the event resolution time
        for m_info in event_data['markets']:
            market_outcomes_data.append({
                'market_ticker': m_info['market_ticker'],
                'result': m_info['result'], # 'yes' or 'no'
                'event_resolution_time_iso': event_close_date_iso
            })
    market_outcomes_df = pd.DataFrame(market_outcomes_data)
    outcomes_csv_path = os.path.join(os.path.dirname(INDIVIDUAL_MARKET_DATA_DIR), "kalshi_btc_hourly_market_outcomes.csv") # Save it one level up from the specific run
    market_outcomes_df.to_csv(outcomes_csv_path, index=False)
    print(f"Market outcomes saved to: {outcomes_csv_path}")
else:
    print("Please run Cell 3 first to populate 'all_matching_events_markets'.")

Market outcomes saved to: market_candlestick_data/kalshi_btc_hourly_market_outcomes.csv
