# 📈 PySpark Market Tick Simulation & Analysis
This notebook simulates live market tick data and performs real-time analysis using PySpark — without needing Kafka.

In [1]:
# ✅ Step 1: Install & Initialize Spark
# # Install if not already
!pip install pyspark findspark

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MarketTickSimulation") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

print("✅ Spark session ready")


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
✅ Spark session ready


In [2]:

# ✅ Step 2: Define Schema & Simulated Tick Generator
#
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import pandas as pd
import random
import datetime

schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("bid", DoubleType(), True),
    StructField("ask", DoubleType(), True),
    StructField("timestamp", TimestampType(), True),
])

def generate_tick(symbol="XAUUSD"):
    bid = round(random.uniform(1850, 1860), 3)
    ask = round(bid + random.uniform(0.01, 0.05), 3)
    ts = datetime.datetime.now()
    return [(symbol, bid, ask, ts)]


In [3]:
# ✅ Step 3: Simulate Live Stream (10 ticks)

import time
from pyspark.sql import Row

live_ticks = []

# Simulate 10 ticks
for i in range(10):
    tick = generate_tick()[0]
    live_ticks.append(tick)

    # Convert to Spark DataFrame
    tick_df = spark.createDataFrame(live_ticks, schema=schema)
    tick_df.show(truncate=False)

    # Optional: store or analyze here

    time.sleep(1)  # Simulate real-time tick


+------+------+--------+--------------------------+
|symbol|bid   |ask     |timestamp                 |
+------+------+--------+--------------------------+
|XAUUSD|1858.1|1858.122|2025-07-01 14:43:34.232424|
+------+------+--------+--------------------------+

+------+--------+--------+--------------------------+
|symbol|bid     |ask     |timestamp                 |
+------+--------+--------+--------------------------+
|XAUUSD|1858.1  |1858.122|2025-07-01 14:43:34.232424|
|XAUUSD|1856.527|1856.56 |2025-07-01 14:43:51.099475|
+------+--------+--------+--------------------------+

+------+--------+--------+--------------------------+
|symbol|bid     |ask     |timestamp                 |
+------+--------+--------+--------------------------+
|XAUUSD|1858.1  |1858.122|2025-07-01 14:43:34.232424|
|XAUUSD|1856.527|1856.56 |2025-07-01 14:43:51.099475|
|XAUUSD|1852.924|1852.935|2025-07-01 14:43:52.81258 |
+------+--------+--------+--------------------------+

+------+--------+--------+---------

In [4]:

# ✅ Step 4: Compute mid price, moving avg & change%

from pyspark.sql.functions import col, avg, lag, expr
from pyspark.sql.window import Window

window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
window_spec_lag = Window.orderBy("timestamp") # Define a separate window spec for lag

tick_df = spark.createDataFrame(live_ticks, schema=schema)

tick_df = tick_df \
    .withColumn("mid_price", (col("bid") + col("ask")) / 2) \
    .withColumn("ma_5", avg("mid_price").over(window_spec)) \
    .withColumn("prev_price", lag("mid_price", 1).over(window_spec_lag)) \
    .withColumn("pct_change", expr("ROUND((mid_price - prev_price)/prev_price * 100, 4)"))

tick_df.select("symbol", "mid_price", "ma_5", "pct_change", "timestamp").show(truncate=False)

+------+------------------+------------------+----------+--------------------------+
|symbol|mid_price         |ma_5              |pct_change|timestamp                 |
+------+------------------+------------------+----------+--------------------------+
|XAUUSD|1858.1109999999999|1858.1109999999999|NULL      |2025-07-01 14:43:34.232424|
|XAUUSD|1856.5435         |1857.3272499999998|-0.0844   |2025-07-01 14:43:51.099475|
|XAUUSD|1852.9295         |1855.8613333333333|-0.1947   |2025-07-01 14:43:52.81258 |
|XAUUSD|1858.6819999999998|1856.5665         |0.3105    |2025-07-01 14:43:54.940799|
|XAUUSD|1856.8645000000001|1856.6261         |-0.0978   |2025-07-01 14:43:56.394726|
|XAUUSD|1853.9185         |1855.7876         |-0.1587   |2025-07-01 14:43:57.896101|
|XAUUSD|1854.7555         |1855.4299999999998|0.0451    |2025-07-01 14:43:59.318023|
|XAUUSD|1857.7945         |1856.4029999999998|0.1638    |2025-07-01 14:44:00.851085|
|XAUUSD|1856.272          |1855.9209999999998|-0.082    |2025-07-

# Portfolio VaR Calculation Using PySpark


We’ll simulate portfolio prices for multiple assets, compute daily returns, and calculate a simple Historical Simulation VaR.

##🔹 Step 1: Simulate portfolio price data (batch)

In [5]:
# Step 1: Start SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PortfolioVaR") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

print("SparkSession started")

# Step 2: Import types
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import datetime
import random

# Step 3: Prepare your simulated data
symbols = ["AAPL", "GOOG", "TSLA"]
days = 30
price_data = []

base_prices = {"AAPL": 150, "GOOG": 2700, "TSLA": 700}
start_date = datetime.date.today() - datetime.timedelta(days=days)

for day in range(days):
    date = start_date + datetime.timedelta(days=day)
    for symbol in symbols:
        base = base_prices[symbol]
        price = base * (1 + random.gauss(0, 0.01))
        price_data.append((symbol, float(price), datetime.datetime.combine(date, datetime.time())))

schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("date", TimestampType(), True),
])

prices_df = spark.createDataFrame(price_data, schema=schema)
prices_df.show(5, truncate=False)


SparkSession started
+------+------------------+-------------------+
|symbol|price             |date               |
+------+------------------+-------------------+
|AAPL  |149.27904062346965|2025-06-01 00:00:00|
|GOOG  |2694.148370538865 |2025-06-01 00:00:00|
|TSLA  |694.5206460117872 |2025-06-01 00:00:00|
|AAPL  |149.47861159215623|2025-06-02 00:00:00|
|GOOG  |2690.5589791564244|2025-06-02 00:00:00|
+------+------------------+-------------------+
only showing top 5 rows





>🔹 Step 2: Calculate daily returns per asset



In [6]:
# Window spec by symbol ordered by date
window_spec = Window.partitionBy("symbol").orderBy("date")

returns_df = prices_df \
    .withColumn("prev_price", lag("price").over(window_spec)) \
    .withColumn("return", (col("price") - col("prev_price")) / col("prev_price")) \
    .na.drop()

returns_df.show(5, truncate=False)


+------+------------------+-------------------+------------------+---------------------+
|symbol|price             |date               |prev_price        |return               |
+------+------------------+-------------------+------------------+---------------------+
|AAPL  |149.47861159215623|2025-06-02 00:00:00|149.27904062346965|0.001336898789361564 |
|AAPL  |148.8572192194209 |2025-06-03 00:00:00|149.47861159215623|-0.004157065456499972|
|AAPL  |149.60441201818296|2025-06-04 00:00:00|148.8572192194209 |0.005019526783317565 |
|AAPL  |148.26196167995175|2025-06-05 00:00:00|149.60441201818296|-0.008973333875126896|
|AAPL  |149.03475129817176|2025-06-06 00:00:00|148.26196167995175|0.005212325598984036 |
+------+------------------+-------------------+------------------+---------------------+
only showing top 5 rows



## 🔹 Step 3: Simulate portfolio weights & calculate portfolio returns

In [8]:
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql import functions as F # Import PySpark SQL functions with alias F
from pyspark.sql.functions import col

# Assign fixed weights for demo
weights = {"AAPL": 0.4, "GOOG": 0.4, "TSLA": 0.2}

# Add weight column
weights_expr = F.create_map([F.lit(x) for x in sum(weights.items(), ())])

weighted_returns_df = returns_df.withColumn("weight", weights_expr[col("symbol")])
weighted_returns_df = weighted_returns_df.withColumn("weighted_return", col("return") * col("weight"))

# Aggregate portfolio returns by date
portfolio_returns_df = weighted_returns_df.groupBy("date").agg(spark_sum("weighted_return").alias("portfolio_return"))
portfolio_returns_df.orderBy("date").show(5, truncate=False)

+-------------------+---------------------+
|date               |portfolio_return     |
+-------------------+---------------------+
|2025-06-02 00:00:00|7.565603612609529E-4 |
|2025-06-03 00:00:00|-8.357450757989856E-4|
|2025-06-04 00:00:00|0.008064735609741759 |
|2025-06-05 00:00:00|-0.006473728662978609|
|2025-06-06 00:00:00|-0.002960881131908823|
+-------------------+---------------------+
only showing top 5 rows



##🔹 Step 4: Calculate Historical VaR at 95% confidence level

In [9]:
# Collect portfolio returns as a list for VaR calc (small demo)
returns_list = [row["portfolio_return"] for row in portfolio_returns_df.collect()]

import numpy as np

# Calculate 5th percentile loss (VaR)
var_95 = np.percentile(returns_list, 5)

print(f"📉 Portfolio 1-day 95% VaR estimate: {var_95:.4%}")


📉 Portfolio 1-day 95% VaR estimate: -1.3861%
