In [36]:
import pyspark
import os
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, LongType, ArrayType, DoubleType
from pyspark.sql.functions import col, explode
import pandas as pd
from datetime import datetime
from sklearn.linear_model import LinearRegression
from numpy.random import multivariate_normal, seed
import random

# Set up environment
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_8').getOrCreate()

# Load stock data
stocks = spark.read.csv(["stocks/ABAX.csv", "stocks/AAME.csv", "stocks/AEPI.csv"], header='true', inferSchema='true')
stocks = stocks.withColumn("Symbol", fun.input_file_name()) \
               .withColumn("Symbol", fun.element_at(fun.split("Symbol", "/"), -1)) \
               .withColumn("Symbol", fun.element_at(fun.split("Symbol", "\."), 1))

# Filter stocks with sufficient data points
stocks = stocks.withColumn('count', fun.count('Symbol').over(Window.partitionBy('Symbol'))) \
               .filter(fun.col('count') > 260*5 + 10)

# Set legacy time parser policy and parse dates
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
stocks = stocks.withColumn('Date', fun.to_date(fun.to_timestamp(fun.col('Date'), 'dd-MMM-yy')))

# Filter stocks within date range
stocks = stocks.filter(fun.col('Date') >= datetime(2009, 10, 23)).filter(fun.col('Date') <= datetime(2014, 10, 23))

# Load and filter factors data
factors = spark.read.csv(["stocks/ABAX.csv", "stocks/AAME.csv", "stocks/AEPI.csv"], header='true', inferSchema='true')
factors = factors.withColumn("Symbol", fun.input_file_name()) \
                 .withColumn("Symbol", fun.element_at(fun.split("Symbol", "/"), -1)) \
                 .withColumn("Symbol", fun.element_at(fun.split("Symbol", "\."), 1))
factors = factors.withColumn('Date', fun.to_date(fun.to_timestamp(fun.col('Date'), 'dd-MMM-yy'))) \
                 .filter(fun.col('Date') >= datetime(2009, 10, 23)) \
                 .filter(fun.col('Date') <= datetime(2014, 10, 23))

# Convert to Pandas DataFrame
stocks_pd_df = stocks.toPandas()
factors_pd_df = factors.toPandas()

# Calculate rolling stock and factor returns
n_steps = 10
def my_fun(x):
    return ((x.iloc[-1] - x.iloc[0]) / x.iloc[0])

stock_returns = stocks_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)
factors_returns = factors_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)
stock_returns = stock_returns.reset_index().sort_values('level_1').reset_index(drop=True)
factors_returns = factors_returns.reset_index().sort_values('level_1').reset_index(drop=True)

# Combine returns with original data
stocks_pd_df_with_returns = stocks_pd_df.assign(stock_returns=stock_returns['Close'])
factors_pd_df_with_returns = factors_pd_df.assign(factors_returns=factors_returns['Close'],
                                                  factors_returns_squared=factors_returns['Close']**2)
factors_pd_df_with_returns = factors_pd_df_with_returns.pivot(index='Date',
                                                              columns='Symbol',
                                                              values=['factors_returns', 'factors_returns_squared'])
factors_pd_df_with_returns.columns = factors_pd_df_with_returns.columns.to_series().str.join('_').reset_index(drop=True)
factors_pd_df_with_returns = factors_pd_df_with_returns.reset_index()

# Merge stocks and factors data
stocks_factors_combined_df = pd.merge(stocks_pd_df_with_returns, factors_pd_df_with_returns, how="left", on="Date")
feature_columns = list(stocks_factors_combined_df.columns[-6:])
with pd.option_context('mode.use_inf_as_na', True):
    stocks_factors_combined_df = stocks_factors_combined_df.dropna(subset=feature_columns + ['stock_returns'])

# Function to find OLS coefficients
def find_ols_coef(df):
    y = df[['stock_returns']].values
    X = df[feature_columns]
    regr = LinearRegression()
    regr_output = regr.fit(X, y)
    return list(df[['Symbol']].values[0]) + list(regr_output.coef_[0])

# Apply linear regression to each stock
coefs_per_stock = stocks_factors_combined_df.groupby('Symbol').apply(find_ols_coef)
coefs_per_stock = pd.DataFrame(coefs_per_stock).reset_index()
coefs_per_stock.columns = ['symbol', 'factor_coef_list']
coefs_per_stock = pd.DataFrame(coefs_per_stock.factor_coef_list.tolist(),
                               index=coefs_per_stock.index,
                               columns=['Symbol'] + feature_columns)

# Calculate mean and covariance of factor returns
f_1 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
f_2 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[1]]['Close']
f_3 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[2]]['Close']
factors_returns_cov = pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': list(f_3)}).cov().to_numpy()
factors_returns_mean = pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': list(f_3)}).mean()

# Broadcast variables for simulation
b_coefs_per_stock = spark.sparkContext.broadcast(coefs_per_stock)
b_feature_columns = spark.sparkContext.broadcast(feature_columns)
b_factors_returns_mean = spark.sparkContext.broadcast(factors_returns_mean)
b_factors_returns_cov = spark.sparkContext.broadcast(factors_returns_cov)

# Set up parallel seeds
parallelism = 1000
num_trials = 1000000
base_seed = 1496
seeds = [b for b in range(base_seed, base_seed + parallelism)]
seedsDF = spark.createDataFrame(seeds, IntegerType()).repartition(parallelism)

# Define UDF for simulation
def calculate_trial_return(x):
    trial_return_list = []
    for i in range(int(num_trials / parallelism)):
        random_int = random.randint(0, num_trials * num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value, b_factors_returns_cov.value)
        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] * (list(random_factors) + list(random_factors**2)))
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum() / b_coefs_per_stock.value.size))
    return trial_return_list

udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

# Run trials
trials = seedsDF.withColumn("trial_return", udf_return(col("value")))
trials = trials.select('value', explode('trial_return').alias('trial_return'))
trials.cache()

# Calculate quantiles and mean return for bottom 5%
quantile_5 = trials.approxQuantile('trial_return', [0.05], 0.0)
avg_low_return = trials.orderBy(col('trial_return').asc()).limit(int(trials.count() / 20)).agg(fun.avg(col("trial_return"))).show()

# Plot the distribution of trial returns
mytrials = trials.toPandas()
mytrials.plot.line()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/sathvik/Documents/pyspark/stocks/ABAX.csv.

In [35]:
spark.stop()