In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import numpy as np
import pandas as pd

In [19]:
spark = SparkSession.builder.appName("Lab7").getOrCreate()

In [20]:
df = spark.read.csv("stock_data.csv", header=True, inferSchema=True)

In [21]:
returns = df.select("Close").rdd.flatMap(lambda x: x).collect()
returns = np.diff(returns) / returns[:-1]    

In [22]:
drift = np.mean(returns)
volatility = np.std(returns)

In [23]:
def monte_carlo_simulation(start_price, drift, volatility, num_steps=252, num_simulations=1000):
    results = []
    for _ in range(num_simulations):
        price_series = [start_price]
        for _ in range(num_steps):
            shock = np.random.normal(0, 1)  
            price = price_series[-1] * np.exp(drift + volatility * shock)
            price_series.append(price)
        results.append(price_series)
    return results

In [24]:
start_price = df.select("Close").tail(1)[0][0]  
num_steps = 252 
num_simulations = 1000  

In [25]:
results_rdd = spark.sparkContext.parallelize([start_price] * num_simulations).map(
    lambda start: monte_carlo_simulation(start, drift, volatility, num_steps, 1)[0]
)

In [27]:
simulated_prices = results_rdd.collect()
simulated_prices_df = pd.DataFrame(simulated_prices).T
simulated_prices_df.columns = [f'Simulation_{i+1}' for i in range(num_simulations)]
print(simulated_prices_df.head())

   Simulation_1  Simulation_2  Simulation_3  Simulation_4  Simulation_5  \
0     52.950000     52.950000     52.950000     52.950000     52.950000   
1     53.070409     53.070409     53.070409     53.070409     53.070409   
2     53.191092     53.191092     53.191092     53.191092     53.191092   
3     53.312050     53.312050     53.312050     53.312050     53.312050   
4     53.433282     53.433282     53.433282     53.433282     53.433282   

   Simulation_6  Simulation_7  Simulation_8  Simulation_9  Simulation_10  ...  \
0     52.950000     52.950000     52.950000     52.950000      52.950000  ...   
1     53.070409     53.070409     53.070409     53.070409      53.070409  ...   
2     53.191092     53.191092     53.191092     53.191092      53.191092  ...   
3     53.312050     53.312050     53.312050     53.312050      53.312050  ...   
4     53.433282     53.433282     53.433282     53.433282      53.433282  ...   

   Simulation_991  Simulation_992  Simulation_993  Simulation_

In [28]:
spark.stop()