In [1]:
PROJECT_ID='kalshi-456121'
BUCKET_NAME='kalshi-data-lake'
CLUSTER='kalshi-dataproc-cluster'
REGION='us-central1'

## Trades

In [2]:
from pyspark.sql import SparkSession

# GCS bucket path to the Parquet file or folder
gcs_path = f"gs://{BUCKET_NAME}/trades/historical/01_2025/trades_2025-04-16T19.parquet"

# Path to the GCS connector jar
from os.path import expanduser
gcs_connector_path = expanduser("~/Documents/gcloud/gcs-connector-hadoop3-latest.jar")
# Start a local Spark session with the GCS connector
spark = SparkSession.builder \
    .appName("QueryGCSParquet") \
    .config("spark.jars", gcs_connector_path) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/jakewatson/hello/kalshi/creds/gcp-sa-key.json") \
    .config("spark.hadoop.fs.gs.project.id", "kalshi-456121") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .getOrCreate()

# Load the Parquet file from GCS
df = spark.read.parquet(gcs_path)

# Perform a simple query
df.select("ticker", "count", "yes_price", "created_time").show(10)

# Stop the session
spark.stop()

25/04/16 16:06:58 WARN Utils: Your hostname, Jakes-MacBook-Pro-1815.local resolves to a loopback address: 127.0.0.1; using 100.73.104.202 instead (on interface en0)
25/04/16 16:06:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/04/16 16:06:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+--------------------+-----+---------+--------------------+
|              ticker|count|yes_price|        created_time|
+--------------------+-----+---------+--------------------+
|KXNASDAQ100U-25AP...|   50|       89|2025-04-16T19:41:...|
|KXFEDDECISION-25M...|    5|       12|2025-04-16T19:41:...|
|KXNBAGAME-25APR16...|  179|       53|2025-04-16T19:41:...|
|KXFEDDECISION-25M...|    4|       12|2025-04-16T19:41:...|
|KXNASDAQ100U-25AP...|   10|       55|2025-04-16T19:41:...|
|KXBTCD-25APR1616-...|  184|       66|2025-04-16T19:41:...|
|KXBTCD-25APR1616-...|   91|       64|2025-04-16T19:41:...|
|KXSECPRESSMENTION...|  100|       35|2025-04-16T19:41:...|
|KXSECPRESSMENTION...|   34|       33|2025-04-16T19:41:...|
|KXSECPRESSMENTION...|   13|       33|2025-04-16T19:41:...|
+--------------------+-----+---------+--------------------+
only showing top 10 rows



## Markets

In [8]:
from pyspark.sql import SparkSession

# GCS bucket path to the Parquet file or folder
gcs_path = f"gs://{BUCKET_NAME}/markets/01012025_to_04242025v2.parquet"

# Path to the GCS connector jar
from os.path import expanduser
gcs_connector_path = expanduser("~/Documents/gcloud/gcs-connector-hadoop3-latest.jar")
# Start a local Spark session with the GCS connector
spark = SparkSession.builder \
    .appName("QueryGCSParquet") \
    .config("spark.jars", gcs_connector_path) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/jakewatson/hello/kalshi/creds/gcp-sa-key.json") \
    .config("spark.hadoop.fs.gs.project.id", "kalshi-456121") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .getOrCreate()

# Load the Parquet file from GCS
df = spark.read.parquet(gcs_path)

# Perform a simple query
last_10 = df.collect()[-10:]
for row in last_10:
    print(row)
# df.show(10)

# Stop the session
spark.stop()

Row(ticker='KXETHD-25APR2515-T1279.99', event_ticker='KXETHD-25APR2515', market_type='binary', title='Ethereum price at Apr 25, 2025 at 3pm EDT?', subtitle='$1,280 or above', yes_sub_title='$1,280 or above', no_sub_title='$1,280 or above', open_time='2025-04-25T18:00:00Z', close_time='2025-04-25T19:00:00Z', expected_expiration_time='2025-04-25T19:05:00Z', expiration_time='2025-05-02T19:00:00Z', latest_expiration_time='2025-05-02T19:00:00Z', settlement_timer_seconds=60, status='initialized', response_price_units='usd_cent', notional_value=100, tick_size=1, yes_bid=0, yes_ask=0, no_bid=100, no_ask=100, last_price=0, previous_yes_bid=0, previous_yes_ask=0, previous_price=0, volume=0, volume_24h=0, liquidity=0, open_interest=0, result='', can_close_early=True, expiration_value='', category='', risk_limit_cents=0, strike_type='greater', custom_strike=None, rules_primary="If the simple average of the sixty seconds of CF Benchmarks' Ethereum Real-Time Index (ERTI) before 3 PM EDT is above 127

                                                                                

## Events

In [3]:
from pyspark.sql import SparkSession

# GCS bucket path to the Parquet file or folder
gcs_path = f"gs://{BUCKET_NAME}/events/all.parquet"

# Path to the GCS connector jar
from os.path import expanduser
gcs_connector_path = expanduser("~/Documents/gcloud/gcs-connector-hadoop3-latest.jar")
# Start a local Spark session with the GCS connector
spark = SparkSession.builder \
    .appName("QueryGCSParquet") \
    .config("spark.jars", gcs_connector_path) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/jakewatson/hello/kalshi/creds/gcp-sa-key.json") \
    .config("spark.hadoop.fs.gs.project.id", "kalshi-456121") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .getOrCreate()

# Load the Parquet file from GCS
df = spark.read.parquet(gcs_path)

# Perform a simple query
df.show(10)

# Stop the session
spark.stop()

25/04/24 11:55:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+--------------------+-----------------+--------------------+--------------------+----------------------+------------------+--------------------+-----------+-------------+
|        event_ticker|    series_ticker|           sub_title|               title|collateral_return_type|mutually_exclusive|            category|strike_date|strike_period|
+--------------------+-----------------+--------------------+--------------------+----------------------+------------------+--------------------+-----------+-------------+
|      KXROBOTMARS-35|      KXROBOTMARS|         Before 2035|Will a humanoid r...|                      |             false|Science and Techn...|       NULL|         NULL|
|       KXNEXTPOPE-35|       KXNEXTPOPE|         Before 2035|Who will the next...|                MECNET|              true|               World|       NULL|         NULL|
|    KXLALEADEROUT-35|    KXLALEADEROUT|         Before 2035|Which of these La...|                MECNET|              true|            Poli

In [None]:
import requests
import pprint

def fetch_markets():
    url = "https://api.elections.kalshi.com/trade-api/v2/markets"
    r = requests.get(url)
    markets = r.json().get("markets", [])
    print(markets[0])
    print(markets[0]['status'])
    print("Keys: ", markets[0].keys())

fetch_markets()

{'can_close_early': True,
 'category': '',
 'close_time': '2026-12-31T15:00:00Z',
 'custom_strike': {'Candidate': 'Mike Lawler'},
 'event_ticker': 'KXGOVNYNOMR-26',
 'expected_expiration_time': '2026-12-31T15:00:00Z',
 'expiration_time': '2026-12-31T15:00:00Z',
 'expiration_value': '',
 'last_price': 0,
 'latest_expiration_time': '2026-12-31T15:00:00Z',
 'liquidity': 0,
 'market_type': 'binary',
 'no_ask': 100,
 'no_bid': 100,
 'no_sub_title': 'Mike Lawler',
 'notional_value': 100,
 'open_interest': 0,
 'open_time': '2025-04-17T14:00:00Z',
 'previous_price': 0,
 'previous_yes_ask': 0,
 'previous_yes_bid': 0,
 'response_price_units': 'usd_cent',
 'result': '',
 'risk_limit_cents': 0,
 'rules_primary': 'If Mike Lawler wins the nomination for the Republican Party '
                  'to contest the 2026 New York Governorship, then the market '
                  'resolves to Yes.',
 'rules_secondary': '',
 'settlement_timer_seconds': 300,
 'status': 'initialized',
 'strike_type': 'custom',

In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import last, when, col, lag, round, avg, expr, to_date, sum as spark_sum, monotonically_increasing_id
import warnings
warnings.filterwarnings("ignore")

from os.path import expanduser
gcs_connector_path = expanduser("~/Documents/gcloud/gcs-connector-hadoop3-latest.jar")

# Start Spark Session
spark = SparkSession.builder \
    .appName("JoinAndAggregateKalshi") \
    .config("spark.jars", gcs_connector_path) \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/jakewatson/hello/kalshi/creds/gcp-sa-key.json") \
    .config("spark.hadoop.fs.gs.project.id", "kalshi-456121") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .getOrCreate()

# ---------------- Trades ----------------- #    
import os
folder_path = f"gs://{BUCKET_NAME}/trades/"

# Read them into one DataFrame
trades_df = spark.read.parquet(f"gs://{BUCKET_NAME}/trades/*.parquet")
trades_df = trades_df.withColumn(
    "bid_amount", 
    round(
        when(col("taker_side") == "yes", col("yes_price") * col("count") / 100)
        .when(col("taker_side") == "no", (col("no_price") * col("count") / 100))
        .otherwise(0), 2
    )
)
trades_df = trades_df.withColumn("created_date", to_date("created_time"))
trades_df.show(10)

# # Define a window by market_ticker, ordered by trade creation time
rolling_7_day_window = Window.partitionBy("ticker").orderBy("created_date").rowsBetween(-6, 0)
rolling_3_day_window = Window.partitionBy("ticker").orderBy("created_date").rowsBetween(-2, 0)
price_window = Window.partitionBy("ticker", "created_date").orderBy(col("created_time")).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
momentum_window = Window.partitionBy("ticker").orderBy("created_date")

# Apply last with enforced ordering using window
trades_df = trades_df.withColumn("yes_price_close", last(when(col("taker_side") == "yes", col("yes_price")), True).over(price_window)) \
                     .withColumn("no_price_close", last(when(col("taker_side") == "no", col("no_price")), True).over(price_window))

# ---------------- Trade Aggregations / Indicators ----------------- #                      
aggs_df = trades_df.groupBy(
    "created_date",
    "ticker",
    "yes_price_close",
    "no_price_close", 
).agg(
    spark_sum(when(col("taker_side") == "yes", col("bid_amount")).otherwise(0)).alias("daily_total_bid_amount_yes"),
    spark_sum(when(col("taker_side") == "no", col("bid_amount")).otherwise(0)).alias("daily_total_bid_amount_no"),
    spark_sum("bid_amount").alias("daily_total_bid_amount")
)

aggs_df = aggs_df.withColumn("7_day_avg_invested_amount", round(avg("daily_total_bid_amount").over(rolling_7_day_window),2)) \
                .withColumn("7_day_avg_yes_invested_amount", round(avg("daily_total_bid_amount_yes").over(rolling_7_day_window),2)) \
                .withColumn("7_day_avg_no_invested_amount", round(avg("daily_total_bid_amount_no").over(rolling_7_day_window),2)) \
                .withColumn("daily_total_vs_7_day_avg", col("daily_total_bid_amount") - round(avg("daily_total_bid_amount").over(rolling_7_day_window),2)) \
                .withColumn("daily_yes_total_vs_7_day_avg", col("daily_total_bid_amount_yes") - round(avg("daily_total_bid_amount_yes").over(rolling_7_day_window),2)) \
                .withColumn("daily_no_total_vs_7_day_avg", col("daily_total_bid_amount_no") - round(avg("daily_total_bid_amount_no").over(rolling_7_day_window),2)) \
                .withColumn("7_day_yes_momentum", col("yes_price_close") - lag("yes_price_close", 7).over(momentum_window)) \
                .withColumn("7_day_no_momentum", col("no_price_close") - lag("no_price_close", 7).over(momentum_window)) \
                .withColumn("3_day_avg_invested_amount", round(avg("daily_total_bid_amount").over(rolling_3_day_window),2)) \
                .withColumn("3_day_avg_yes_invested_amount", round(avg("daily_total_bid_amount_yes").over(rolling_3_day_window),2)) \
                .withColumn("3_day_avg_no_invested_amount", round(avg("daily_total_bid_amount_no").over(rolling_3_day_window),2)) \
                .withColumn("daily_total_vs_3_day_avg", col("daily_total_bid_amount") - round(avg("daily_total_bid_amount").over(rolling_3_day_window),2)) \
                .withColumn("daily_yes_total_vs_3_day_avg", col("daily_total_bid_amount_yes") - round(avg("daily_total_bid_amount_yes").over(rolling_3_day_window),2)) \
                .withColumn("daily_no_total_vs_3_day_avg", col("daily_total_bid_amount_no") - round(avg("daily_total_bid_amount_no").over(rolling_3_day_window),2)) \
                .withColumn("3_day_yes_momentum", col("yes_price_close") - lag("yes_price_close", 3).over(momentum_window)) \
                .withColumn("3_day_no_momentum", col("no_price_close") - lag("no_price_close", 3).over(momentum_window))

# aggs_df.filter(col("ticker") == "KXFEDDECISION-25MAY-C25").show()

# ---------------- Markets, Events ----------------- #                      
markets_df = spark.read.parquet(f"gs://{BUCKET_NAME}/markets/01012025_to_04242025.parquet")
markets_df = markets_df.select("ticker", "event_ticker", col("title").alias("market_title"), col("subtitle").alias("market_subtitle"), "open_time", "close_time", "market_type")

events_df = spark.read.parquet(f"gs://{BUCKET_NAME}/events/all.parquet")
events_df = events_df.select("event_ticker", "series_ticker", col("title").alias("event_title"), col("sub_title").alias("event_subtitle"), "category")

# ---------------- Joined ----------------- #   
joined_df = aggs_df.join(markets_df.alias("m"), on=["ticker"], how="inner") \
                    .join(events_df.alias("e"), on=col("m.event_ticker") == col("e.event_ticker"), how="inner")                 

joined_df.filter(col("ticker") == "KXFEDDECISION-25MAY-C25").select(
    # Identifiers
    "m.ticker",
    "e.event_ticker",
    "series_ticker",
    "created_date",

    # Market/Event Info
    "event_title",
    "market_title",
    "market_subtitle",
    "event_subtitle",
    "open_time",
    "close_time",
    "market_type",
    "category",

    # Prices
    "yes_price_close",
    "no_price_close",

    # Daily Totals
    "daily_total_bid_amount_yes",
    "daily_total_bid_amount_no",
    "daily_total_bid_amount",

    # 7-Day Averages
    "7_day_avg_invested_amount",
    "7_day_avg_yes_invested_amount",
    "7_day_avg_no_invested_amount",
    "daily_total_vs_7_day_avg",
    "daily_yes_total_vs_7_day_avg",
    "daily_no_total_vs_7_day_avg",
    "7_day_yes_momentum",
    "7_day_no_momentum",

    # 3-Day Averages
    "3_day_avg_invested_amount",
    "3_day_avg_yes_invested_amount",
    "3_day_avg_no_invested_amount",
    "daily_total_vs_3_day_avg",
    "daily_yes_total_vs_3_day_avg",
    "daily_no_total_vs_3_day_avg",
    "3_day_yes_momentum",
    "3_day_no_momentum"
).show(10)

spark.stop()

                                                                                

+--------------------+--------------------+-----+--------------------+---------+--------+----------+----------+------------+
|            trade_id|              ticker|count|        created_time|yes_price|no_price|taker_side|bid_amount|created_date|
+--------------------+--------------------+-----+--------------------+---------+--------+----------+----------+------------+
|13e20041-a226-47f...|KXWMARMAD-25R64G1...|   73|2025-03-21T23:59:...|       99|       1|        no|      0.73|  2025-03-21|
|40d20184-6d4f-4d3...|KXMARMAD-25R64G7-...|    5|2025-03-21T23:59:...|       49|      51|       yes|      2.45|  2025-03-21|
|48e5bf41-187a-44f...|KXMARMAD-25R64G30-UK|   25|2025-03-21T23:59:...|       92|       8|        no|       2.0|  2025-03-21|
|388abf18-291b-4dc...|KXMARMAD-25R64G7-UNM|  234|2025-03-21T23:59:...|       53|      47|       yes|    124.02|  2025-03-21|
|980f8084-2678-419...|   KXELECTUKRAINE-26|  207|2025-03-21T23:59:...|       41|      59|        no|    122.13|  2025-03-21|




+--------------------+-------------------+-------------+------------+--------------------+--------------------+---------------+--------------+--------------------+--------------------+-----------+---------+---------------+--------------+--------------------------+-------------------------+----------------------+-------------------------+-----------------------------+----------------------------+------------------------+----------------------------+---------------------------+------------------+-----------------+-------------------------+-----------------------------+----------------------------+------------------------+----------------------------+---------------------------+------------------+-----------------+
|              ticker|       event_ticker|series_ticker|created_date|         event_title|        market_title|market_subtitle|event_subtitle|           open_time|          close_time|market_type| category|yes_price_close|no_price_close|daily_total_bid_amount_yes|daily_total_bi

                                                                                